【问题标题】:Kafka Streams commits offset when producer throws an exception当生产者抛出异常时,Kafka Streams 提交偏移量
【发布时间】:2020-05-24 05:36:06
【问题描述】:

在我的 Kafka 流应用程序中,我有一个处理器,它计划每 60 秒产生一次输出消息。输出消息由来自单个输入主题的消息构建。有时会发生输出消息大于代理的配置限制(默认为 1MB)。抛出异常并关闭应用程序。提交间隔设置为默认值(60 秒)。

在这种情况下,我希望在下一次运行中,在崩溃前 60 秒内消耗的所有消息都会被重新消耗。但实际上这些消息的偏移量已提交,并且在下次运行时不会再次处理这些消息。

阅读similar questions 的答案,在我看来,不应提交偏移量。当我将提交间隔增加到 120 秒(处理器仍然每 60 秒打断一次)时,它会按预期工作并且不会提交偏移量。

我正在使用默认处理保证,但我也尝试过exactly_once。两者的结果相同。从处理器调用context.commit() 似乎对此问题没有影响。

我在这里做错了吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    Kafka Streams 中Processor 的约定是,您在process() 返回之前完全 处理了输入记录和forward() 所有相应的输出消息。 -- 这个合约意味着Kafka Streams可以在process()返回后提交相应的offset。

    您似乎在内存中的process() 中“缓冲”了消息,以便稍后发出它们。这违反了本合同。如果您想“缓冲”消息,您应该将状态存储附加到Processor 并将所有这些消息放入存储中(参见https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#state-stores)。该商店由 Kafka Streams 为您管理,并且具有容错性。这样,在发生错误后,状态将被恢复,并且您不会丢失任何数据(即使输入消息没有被重新处理)。

    我怀疑将提交间隔设置为 120 秒实际上在所有情况下都能按预期工作,因为在提交发生时间和调用标点符号之间没有对齐。

    【讨论】:

      【解决方案2】:

      其中一些取决于您使用的客户端以及它是否基于 librdkafka。 一些答案还取决于您如何“循环”“轮询”方法。一个典型的例子类似于https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html“自动偏移提交”下的代码 但这假设了一个相当快速的轮询循环(100 毫秒 + 处理时间)和一个 auto.commit.timeout.ms 在 1000 毫秒(默认通常是 5000 毫秒)。

      如果我没看错你的问题,你似乎每 60 秒消费一次消息?

      需要注意的是,kafka 客户端的行为与调用poll 的频率密切相关(一些库会将 poll 包装在类似“Consume”方法的内部)。频繁调用 poll 很重要,以便让经纪人看起来“活跃”。如果您不至少每max.poll.interval.ms 轮询一次(默认为 5 分钟),您将获得其他异常。这可能会导致客户被踢出他们的消费群体。

      无论如何,直截了当...auto.commit.interval.ms 只是一个最大值。如果消息已被接受/确认或已使用 StoreOffset,则在轮询时,客户端可以决定更新代理上的偏移量。可能是由于客户端缓冲区大小受到影响或其他一些语义。

      要查看的另一件事(尤其是如果使用基于 librdkafka 的客户端。其他人也有类似的东西)是 enable.auto.offset.store(默认为 true)这将“自动存储提供给应用程序的最后一条消息的偏移量”,因此每次轮询/消费时来自客户端的消息将 StoreOffset。如果您还使用 auto.commit,那么您的偏移量可能会以您意想不到的方式移动。

      有关 librdkafka 的完整配置集,请参阅 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

      有很多/很多消费/确认的方式。我认为对于您的情况,配置页面上max.poll.interval.ms 的评论可能是相关的。

      " 注意:对于长时间处理的应用,建议设置enable.auto.offset.store=false,然后在消息处理后显式存储偏移量(使用offsets_store()) "

      抱歉,这个“答案”有点啰嗦。我希望有一些线程供您继续学习。

      【讨论】:

      • 这个答案不是为 Kafka Streams 量身定做的,没有用处。
      猜你喜欢
      • 1970-01-01
      • 2022-06-23
      • 2017-08-22
      • 1970-01-01
      • 2017-05-27
      • 2021-06-18
      • 1970-01-01
      • 1970-01-01
      • 2017-12-10
      相关资源
      最近更新 更多