【问题标题】:How to configure kafka such that we have an option to read from the earliest, latest and also from any given offset?如何配置 kafka 以便我们可以选择从最早的、最新的以及任何给定的偏移量读取?
【发布时间】:2018-01-10 17:05:16
【问题描述】:

我知道将 kafka 配置为从最早或最新消息中读取。 如果我需要从以前的偏移量中读取,我们如何包含一个附加选项? 我需要这样做的原因是由于之前的处理逻辑中的一些错误,需要再次处理之前读取的消息。

【问题讨论】:

    标签: apache-kafka message-queue offset kafka-python sequential-workflow


    【解决方案1】:

    在java kafka客户端中,有一些关于kafka消费者的方法可以用来指定下一个消费位置。

    public void seek(TopicPartition 分区, 长偏移)

    覆盖消费者将在下一次轮询(超时)时使用的获取偏移量。如果多次为同一个分区调用此 API,则在下一次 poll() 中将使用最新的偏移量。请注意,如果在消费过程中随意使用此 API 来重置获取偏移量,您可能会丢失数据

    这就够了,还有seekToBeginning和seekToEnd。

    【讨论】:

    • 如果有 3 个分区,并且最新的偏移量是 12,13 和 15,如果我们想读取自特定时间戳以来的所有消息,我们如何完成呢?
    • 由于时间戳无法读取消息,只是偏移量。您可以阅读所有消息,然后处理您想要的消息如果消息包含时间戳值。
    • 您的意思是说,阅读每条消息并将其在我的脚本中与我正在寻找的时间戳进行比较?
    • 是的,Kafka不支持这个功能,需要写代码来实现。
    • 在 Kafka 0.11 中,您可以在 Java 客户端中获取时间戳的偏移量。见kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/…。您还可以使用管理脚本 bin/kafka-consumer-groups --reset-offsets 从外部更改存储在 Kafka 消费者偏移量主题中的偏移量。也不再需要使用 zookeeper 进行偏移存储(从 0.9 开始)
    【解决方案2】:

    我正在尝试回答一个类似但不完全相同的问题,所以让我们看看我的信息是否可以帮助您。

    首先,I have been working from this other SO question/answer

    简而言之,您想要提交您的偏移量,最常见的解决方案是 ZooKeeper。因此,如果您的消费者遇到错误或需要关闭,它可以从中断的地方恢复。

    我自己正在处理一个非常大的高容量流,我的消费者(用于测试)每次都需要从尾部开始。文档表明我必须使用KafkaConsumer seek 来声明我的起点。

    一旦我的发现成功且可靠,我会尝试在此处更新我的发现。这肯定是一个已解决的问题。

    【讨论】:

    • 自 0.9 以来最常见的存储偏移量的地方是 Kafka 本身(在 __consumer_offsets 主题中)。 Zookeeper 我们仅用于旧消费者 API 中的偏移量。
    猜你喜欢
    • 1970-01-01
    • 2019-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-20
    • 1970-01-01
    • 2021-10-05
    • 2020-08-04
    相关资源
    最近更新 更多