【问题标题】:Flink Kafka Sink org.apache.kafka.common.errors.UnsupportedVersionException ERRORFlink Kafka Sink org.apache.kafka.common.errors.UnsupportedVersionException ERROR
【发布时间】:2020-12-30 08:42:33
【问题描述】:

版本 flink(1.11.3)、kafka(2.1.1)

我的 flink 数据管道是 kafka(source) -> flink -> kafka(sink)。

当我首先提交作业时,它运行良好。 但是jobmanager或者taskmanagers失败后,如果重启,就会出现异常

2020-12-31 10:35:23.831 [objectOperator -> Sink: objectSink (1/1)] WARN o.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - 遇到错误 org.apache.kafka.common .errors.InvalidTxnStateException:生产者尝试在无效状态下进行事务操作。在恢复事务 KafkaTransactionState [transactionalId=objectOperator -> Sink: objectSink-bcabd9b643c47ab46ace22db2e1285b6-3, producerId=14698, epoch=7] 时。大概这个事务之前已经提交过 2020-12-31 10:35:23.919 [userOperator -> Sink: userSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - userOperator -> Sink: userSink (1/1) (2a5a171aa335f444740b4acfc7688d7c)从 RUNNING 切换到 FAILED。 org.apache.kafka.common.errors.InvalidPidMappingException:生产者尝试使用当前未分配给其事务 ID 的生产者 ID。 2020-12-31 10:35:24.131 [objectOperator -> Sink: objectSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - objectOperator -> Sink: objectSink (1/1) (07fe747a81b31e016e88ea6331b31433)从 RUNNING 切换到 FAILED。 org.apache.kafka.common.errors.UnsupportedVersionException:尝试在版本 1 中写入非默认 producerId

不知道为什么会出现这个错误。

我的卡夫卡生产者代码

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

我认为不是版本问题。 好像没有人遇到过和我一样的错误

【问题讨论】:

    标签: apache-kafka apache-flink flink-streaming


    【解决方案1】:

    每个生产者在初始化时都被分配了一个唯一的 PID。此 PID 对应用程序是透明的,根本不会向用户公开。对于给定的PID,序号会从0开始递增,每个Topic-Partition都会有一个独立的序号。 Producer发送数据时,会为每个msg标识一个序列号,Server会以此来验证数据是否重复。这里的PID是全局唯一的,Producer失败后重启后会分配一个新的PID。这也是无法跨会话实现幂等性的原因之一。

    如果从savepoint恢复,会使用之前的producerId,一个新的session会产生1000个新的producerId(这些id贯穿整个session,相当于默认值),所以是non-default的

    p>

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-11-08
      • 2018-11-04
      • 1970-01-01
      • 1970-01-01
      • 2020-07-31
      • 2022-06-30
      • 2020-12-17
      相关资源
      最近更新 更多