【发布时间】:2020-12-30 08:42:33
【问题描述】:
版本 flink(1.11.3)、kafka(2.1.1)
我的 flink 数据管道是 kafka(source) -> flink -> kafka(sink)。
当我首先提交作业时,它运行良好。 但是jobmanager或者taskmanagers失败后,如果重启,就会出现异常
2020-12-31 10:35:23.831 [objectOperator -> Sink: objectSink (1/1)] WARN o.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - 遇到错误 org.apache.kafka.common .errors.InvalidTxnStateException:生产者尝试在无效状态下进行事务操作。在恢复事务 KafkaTransactionState [transactionalId=objectOperator -> Sink: objectSink-bcabd9b643c47ab46ace22db2e1285b6-3, producerId=14698, epoch=7] 时。大概这个事务之前已经提交过 2020-12-31 10:35:23.919 [userOperator -> Sink: userSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - userOperator -> Sink: userSink (1/1) (2a5a171aa335f444740b4acfc7688d7c)从 RUNNING 切换到 FAILED。 org.apache.kafka.common.errors.InvalidPidMappingException:生产者尝试使用当前未分配给其事务 ID 的生产者 ID。 2020-12-31 10:35:24.131 [objectOperator -> Sink: objectSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - objectOperator -> Sink: objectSink (1/1) (07fe747a81b31e016e88ea6331b31433)从 RUNNING 切换到 FAILED。 org.apache.kafka.common.errors.UnsupportedVersionException:尝试在版本 1 中写入非默认 producerId
不知道为什么会出现这个错误。
我的卡夫卡生产者代码
Properties props = new Properties();
props.setProperty("bootstrap.servers", servers);
props.setProperty("transaction.timeout.ms", "30000");
FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
topic,((record, timestamp) -> new ProducerRecord<>(
topic
, Longs.toByteArray(record.getUserInKey())
, JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
我认为不是版本问题。 好像没有人遇到过和我一样的错误
【问题讨论】:
标签: apache-kafka apache-flink flink-streaming