【问题标题】:Fetching offset from kafka high-level consumer从 kafka 高级消费者获取偏移量
【发布时间】:2016-03-11 05:31:52
【问题描述】:

我想从 java 程序中的高级消费者获取 kafka 消息偏移量。由于我使用的是自定义 commitoffset 属性,因此我想测试我的自定义 commitoffset 是否正常工作。谁能帮我如何抵消??? 我遇到了几个 kafka 工具(如 getoffsetshell),但它对我的测试没有帮助。

【问题讨论】:

  • 每条消息发送到Kafka后是否要获取偏移量?喜欢带有相关 Kafka 元数据信息的回调方法?

标签: java apache-kafka


【解决方案1】:

当收到来自ConsumerIterator 的消息时,您还可以通过执行以下操作来获取偏移量:

    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(getConsumerConfig());
    KafkaStream<byte[], byte[]> stream = getKafkaStream(consumerConnector);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
        String message = new String(messageAndMetadata.message());
        long offset = messageAndMetadata.offset();
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-09-09
    • 2020-06-11
    • 1970-01-01
    • 2017-01-20
    • 2019-01-18
    • 2015-07-06
    • 2019-05-01
    • 2018-02-03
    相关资源
    最近更新 更多