【问题标题】:Flink Kafka Source Asynchronous auto-commit of offsets failedFlink Kafka Source 异步自动提交偏移量失败
【发布时间】:2022-10-04 19:52:32
【问题描述】:

Flink 版本:v1.15.2

我在使用 Apache Flink 时遇到了问题:当 Flink 任务的 Kafka Source 表使用与其他 Kafka 消费者相同的组 ID 时,Flink 无法提交偏移量。问题场景描述如下:

  1. 我有一个 Java 应用程序,它是 Kafka 消费者,使用消费者组“TopicA”来使用来自主题“topic_a”的数据

  2. 有一个Flink任务,它的Kafka Source表使用的Kafka消费组也是'TopicA',但是消费的是'topic_b'主题的数据

    此时Flink任务的日志信息中出现如下错误: Asynchronous auto-commit of offsets {topic_b-0=OffsetAndMetadata{offset=xxx, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 返回的批次的最大大小来解决这个问题。

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    Java 应用程序使用 Spring-Kafka 框架,该框架默认使用 subcribe() 方法。

    Flink Kafka 连接器使用 assign() 方法。 它们使用相同的消费者组 ID,因此在 Flink 中提交 Offset 时不可避免地会报错。

    解决方案是在 KafkaListener 注解上指定需要消费的分区。 例如:

    @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_a", partitions = {"0"})})
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-12-10
      • 1970-01-01
      • 2017-06-24
      • 2021-04-06
      • 2017-06-07
      • 1970-01-01
      • 2019-07-30
      • 2020-08-08
      相关资源
      最近更新 更多