【发布时间】:2018-01-10 17:05:16
【问题描述】:
我知道将 kafka 配置为从最早或最新消息中读取。 如果我需要从以前的偏移量中读取,我们如何包含一个附加选项? 我需要这样做的原因是由于之前的处理逻辑中的一些错误,需要再次处理之前读取的消息。
【问题讨论】:
标签: apache-kafka message-queue offset kafka-python sequential-workflow
我知道将 kafka 配置为从最早或最新消息中读取。 如果我需要从以前的偏移量中读取,我们如何包含一个附加选项? 我需要这样做的原因是由于之前的处理逻辑中的一些错误,需要再次处理之前读取的消息。
【问题讨论】:
标签: apache-kafka message-queue offset kafka-python sequential-workflow
在java kafka客户端中,有一些关于kafka消费者的方法可以用来指定下一个消费位置。
public void seek(TopicPartition 分区, 长偏移)
覆盖消费者将在下一次轮询(超时)时使用的获取偏移量。如果多次为同一个分区调用此 API,则在下一次 poll() 中将使用最新的偏移量。请注意,如果在消费过程中随意使用此 API 来重置获取偏移量,您可能会丢失数据
这就够了,还有seekToBeginning和seekToEnd。
【讨论】:
我正在尝试回答一个类似但不完全相同的问题,所以让我们看看我的信息是否可以帮助您。
首先,I have been working from this other SO question/answer
简而言之,您想要提交您的偏移量,最常见的解决方案是 ZooKeeper。因此,如果您的消费者遇到错误或需要关闭,它可以从中断的地方恢复。
我自己正在处理一个非常大的高容量流,我的消费者(用于测试)每次都需要从尾部开始。文档表明我必须使用KafkaConsumer seek 来声明我的起点。
一旦我的发现成功且可靠,我会尝试在此处更新我的发现。这肯定是一个已解决的问题。
【讨论】: