【问题标题】:How do I read Kafka messages in consumer from last committed offset in Apache Nifi?如何从 Apache Nifi 中上次提交的偏移量读取消费者中的 Kafka 消息?
【发布时间】:2019-01-06 19:09:44
【问题描述】:

我已经开始我的生产者向 Kafka 发送数据,并开始我的消费者来提取相同的数据。当我在 Apache Nifi 中使用 Consumekafka 处理器(kafka 版本 1.0)时,我的脑海中几乎没有与卡夫卡消费者。

Q.1) 当我第一次启动我的 ConsumeKafka 处理器时,如何从开始和当前消息中读取消息?

Q.2) 以及在 Kafka 中消费者关闭的情况下,如何在最后一条消费消息之后读取消息?

我们如何在使用 Apache Nifi 的同时实现以上两个?

【问题讨论】:

    标签: apache-kafka bigdata kafka-consumer-api apache-nifi kylo


    【解决方案1】:

    ConsumeKafka 处理器有一个名为“Offset Reset”的属性,当消费者组 id 没有先前的偏移量或偏移量不再存在时,将使用该属性。此属性的选项为“Offset Latest”或“Offset Earliest”,默认为最新。

    因此,如果您使用以前从未使用过的消费者组 ID 启动 ConsumeKafka 处理器,那么它会从最新消息开始消费。之后,如果您启动和停止处理器,它将从上次消耗的偏移量开始。

    如果你想再次使用“偏移重置”来强制它为最早或最新,那么你需要更改消费者组 id,因为否则现有的消费者组将始终使用现有的偏移量开始。

    您不能同时从头和当前阅读消息,您可以从头开始并一直阅读到当前,或者从当前开始。这是 Kafka 的工作方式,并不特定于 NiFi。

    【讨论】:

    • 谢谢。我们可以每次都将“偏移重置”选项设置为“最早”吗?所以在那种情况下,没有数据丢失我的意思是历史数据和当前数据,我正确吗?。
    • 您只在第一次启动处理器时设置偏移重置,之后它总是从最早到最新读取并从中断处重新开始,因此不会丢失数据
    • 好的,谢谢。如果我将“偏移重置”选项设置为“最早”,是否有任何缺点?为什么默认将此选项设置为“最新”(任何特定原因)?
    • 没有缺点,对于默认行为,它归结为当有人第一次针对可能包含数百万条消息的现有主题启动处理器时,如果默认行为最早,则用户可能不会意识到他们将要消耗数百万条消息
    • 谢谢。
    猜你喜欢
    • 2022-11-13
    • 2017-05-12
    • 1970-01-01
    • 1970-01-01
    • 2016-02-14
    • 2014-09-22
    • 2022-11-11
    • 2016-09-09
    • 1970-01-01
    相关资源
    最近更新 更多