【问题标题】:Re-consume messages for which offset was not commited重新消费未提交偏移量的消息
【发布时间】:2025-12-10 10:20:07
【问题描述】:

我有一个自定义的 Kafka 消费者,我用它来向 REST API 发送一些请求。 根据 API 的响应,我要么提交偏移量,要么在不提交的情况下跳过消息。

小例子:

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

现在,当来自 REST API 的响应不是以 2 开头时,不会提交偏移量,但不会重新使用消息。如何强制消费者重新使用未提交偏移量的消息?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    如果您打算使用 seek(),请确保您的数据是幂等的。由于您有选择地提交偏移量,因此遗漏的记录可能会在提交(成功处理)记录之前。如果您执行 seek() - 将 groupId 的指针移动到未提交的偏移量并开始重播,您还将获得那些成功处理的消息。它也有可能成为无限循环。

    或者,您可以将不成功记录的元数据保存在内存或数据库中,并从“poll(retention.ms)”开始重播主题,以便重播所有记录,但添加过滤器以仅通过 API 处理元数据与你之前保存的东西。每隔一小时或几个小时执行一次批处理。

    【讨论】:

      【解决方案2】:

      提交偏移量只是存储消费者当前偏移量(也称为位置)的一种方式。因此,如果它停止了,它(或接管的新消费者实例)可以找到它之前的位置并从那里重新开始消费。

      因此,即使您不提交,一旦您收到记录,消费者的位置也会移动。如果你想重新消费一些记录,你必须改变消费者的当前位置。

      使用 Java 客户端,您可以使用seek() 设置位置。

      在您的场景中,您可能想要计算相对于当前位置的新位置。如果是这样,您可以使用position() 找到当前位置。

      【讨论】:

        【解决方案3】:

        以下是您可以采取的替代方法(而不是寻找):

        1. 当 REST 失败时,将消息移动到 adhoc kafka 主题。您可以编写另一个程序以按计划的方式读取该主题的消息。
        2. 当 REST 失败时,将请求写入平面平面。使用 shell/任何脚本读取每个请求并按计划发送。

        【讨论】: