【问题标题】:How can I fetch data from specific offset in Kafka?如何从 Kafka 中的特定偏移量获取数据?
【发布时间】:2019-06-16 15:52:15
【问题描述】:

我正在使用 kafka-node 客户端从 kafka 发布和订阅。我想获取特定偏移量的特定消息。

我不知道这怎么可能。如果有人有解决方案,请帮忙。

谢谢

【问题讨论】:

  • 你可以使用方法seek(TopicPartition partition, long offset)
  • 感谢您的建议,但是我在哪里可以找到这种方法。我用的是kafka-node客户端,没有这样的方法。
  • 有一个相等的方法setOffset
  • 不工作,我正在分享我的代码。 var kafka = require('kafka-node'), Consumer = kafka.Consumer, client = new kafka.KafkaClient("127.0.0.1:9092"), consumer = new Consumer(client, [ { topic: 'TopicName1', partition : 0 } ], { autoCommit: false } ); console.log(consumer.setOffset('TopicName1', 0, 0))

标签: node.js apache-kafka kafka-consumer-api


【解决方案1】:

Kafka 是用于数据流处理和缓冲的消息代理,它不是数据库。所以随机访问数据是不可能的。有限读取策略是从特定分区读取数据。最多,你可以设计你的生产者根据你的阅读需求来分发数据。例如,在从传感器接收数据时,可以创建一个有 24 个分区的主题,并根据采样时间戳发布传感器数据。现在,您在特定分区中拥有每小时数据。

然而,这种策略并不适合卡夫卡哲学!在使用来自 Kafka 的数据时,您可以根据分区数量达到最大的数据并行度,每个分区一个消费者。但是,当您根据采样时间戳将每条传入消息发布到分区时,一次只有一个分区正在缓冲数据,您的应用程序可以并行处理数据!

【讨论】:

    【解决方案2】:

    当然,一般来说这是可能的,因为低级 Kafka 协议的 fetch 请求允许指定起始偏移量。

    看一下我不太了解的 kafka-node 库,我看到使用 addTopics 函数可以传递主题信息以及要开始阅读的偏移量和您还需要将fromOffset 参数设置为true。 还有setOffset 方法可以做到这一点。 以下参考:https://github.com/SOHU-Co/kafka-node#consumer

    【讨论】:

    • 感谢您的建议,我只想使用特定偏移量获取单个消息。如果我使用 fromOffset 那么它将从给定的偏移量消耗到最后一个偏移量,但我不希望这样。我尝试了 setOffset 方法,但它不起作用。
    • 那是不可能的。您必须在应用程序级别处理此问题,以便从该偏移量获取更多消息,但只处理第一个。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-02-02
    • 1970-01-01
    • 2017-01-10
    • 1970-01-01
    • 2017-12-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多