【问题标题】:Commiting offset after consumer error handling in spring-kafka在spring-kafka中消费者错误处理后提交偏移量
【发布时间】:2018-12-17 16:32:27
【问题描述】:

我不完全理解消费者错误处理如何与提交偏移量和 akcMode 一起工作,以及它如何因错误停止容器而受到影响(使用 spring-kafka 1.3.*)。

假设我有两个消费者(使用两个分区),他们在轮询时都从他们的分区中获取 5 个事件 (max.records.per.poll=5)。

第一个消费者 - 第一个事件处理正常,处理第二个事件失败 - 所以在错误处理程序中我调用kafkaListenerEndpointRegistry.stop(),但由于实施了停止它只是停止消费者轮询,两个消费者仍然完成处理他们当前的批次。因此,第一个消费者处理事件 3、4、5(所有这些事件都没有错误地处理),假设第二个消费者在第 4 个事件上失败(并且事件 1、2、3、5 处理正常)。 我的问题是将为每个消费者提供哪些补偿?

我的理解是:

  • 当我将AckMode.RECORD/BATCHackOnError 结合使用时- 将为两个消费者提交最新的偏移量(5)
  • 当我将AckMode.RECORD/BATCH!ackOnError 结合使用时 - 还将为两个消费者提交最新的偏移量 - 因为虽然在处理批处理期间某些事件失败,但批处理中最新处理的事件是正常的,因此最新处理的事件偏移量获胜.

我的理解正确吗?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    你的理解是正确的;当您停止容器时,您还应该向侦听器发出信号,告知它也需要拒绝任何剩余的记录,这样它们的偏移量就不会被提交。

    我们正在考虑添加一个stopNow() 方法,这将阻止向侦听器发送额外的记录。

    在 2.0 中,我们添加了 RemainingRecordsErrorHandler(以及一个实现,SeekToCurrentErrorHandler)。当容器检测到这样的错误处理程序时,它将剩余的记录呈现给错误处理程序而不是侦听器。

    SeekToCurrentErrorHandler 将所有主题/分区寻找到未处理的偏移量(包括失败的记录),以便在下一次轮询时检索它们。

    自定义实现可能会寻找剩余的记录,但会将失败的记录发送到死信主题(或以其他方式处理)。

    也就是说,stopNow() 对大多数人来说可能更容易处理,但它可能只是 2.2 的功能; 1.3.x 用户需要在失败后丢弃/拒绝未处理的记录。

    您也可以使用RetryingMessageListenerAdapter(如果使用@KafkaListener,则启用重试),它将根据其重试配置重试交付,根本不涉及Kafka。失败的记录可以通过RecoveryCallback 在重试用尽并提交其偏移量后处理;在这种情况下不需要停止容器。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-12-14
    • 1970-01-01
    • 2022-11-13
    • 2020-08-08
    • 1970-01-01
    • 2022-11-11
    • 2021-11-10
    • 2017-08-22
    相关资源
    最近更新 更多