【问题标题】:Spring kafka consumer, seek offset at runtime?Spring kafka消费者,在运行时寻求偏移量?
【发布时间】:2017-08-17 05:41:46
【问题描述】:

我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每个记录,它也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。

但如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它,直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。

KafkaMessageListenerContainer 可以做到吗?

【问题讨论】:

  • 您在应用程序逻辑之后提交偏移量。因此,如果您在应用程序逻辑失败的情况下不提交偏移量,则偏移量将不会向前移动,您将再次处理相同的消息。这能解决您的问题吗?
  • @yaswanth 不,我猜它不像那样工作。在开始测试之前,我的假设与您相似。我启用了 ENABLE_AUTO_COMMIT_CONFIG - false 和 AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE 的容器属性与 AcknowledgeingMessageListener。我将记录 1 和记录 2 发送到主题,记录 1- 应用程序逻辑失败(我不回复),此时下一次消费者轮询应该再次获取记录 1,但我得到记录 2。如果这是配置参数修复的问题请告诉我!
  • 你是对的!到目前为止,我一直处于错误的假设之中。

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

Seeking to a Specific Offset

为了寻找,你的监听器必须实现ConsumerSeekAware,它有以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

第一个在容器启动时调用;在初始化后的某个任意时间进行搜索时,应使用此回调。您应该保存对回调的引用;如果您在多个容器(或ConcurrentMessageListenerContainer)中使用相同的侦听器,则应将回调存储在ThreadLocal 或其他由侦听器线程键入的结构中。

【讨论】:

  • 我正在使用 KafkaListenerContainerFactory、KafkaListenerEndpointRegistry 和 MethodKafkaListenerEndpoint。无法找到如何使用 ConsumerSeekAware,因为我没有直接编写 Listener 类。
  • 不要在 cmets 中提出新问题。在启动容器之前,您必须创建自己的消费者(具有相同的组 ID)并在那里进行搜索。
  • 明白你的意思@Gary。并感谢您的回答。
【解决方案2】:

please refer to Spring Kafka document

“对于现有的组 ID,初始偏移量是该组 ID 的当前偏移量”。

而且 Confluent 还提到: “如果消费者崩溃或被关闭,它的分区将被重新分配给另一个成员,该成员将从每个分区的最后提交的偏移量开始消费”

这意味着,在您的情况下,当消费者恢复时,它将在最后提交的偏移量处继续。如果您手动提交,您也不需要“寻找失败的偏移量”

ENABLE_AUTO_COMMIT_CONFIG = 假

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-01
    • 2018-02-03
    • 1970-01-01
    • 2020-08-08
    相关资源
    最近更新 更多