【问题标题】:Getting the last message sent to a kafka topic获取发送到 kafka 主题的最后一条消息
【发布时间】:2018-04-20 06:21:26
【问题描述】:

我是 Kafka 的新手,正在研究将专有流媒体服务连接到 Kafka 的原型。

我正在寻找在某个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要使用它在连接时收到的最后一条消息的 ID 登录。

是否可以使用 KafkaProducer 或 KafkaConsumer 来执行此操作?

我尝试使用消费者执行以下操作,但在运行控制台消费者时,我看到消息重播。

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for(TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition,offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        lastKey = record.key();
    }
    consumer.close();

这是预期的行为还是我走错了路?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    问题出在final long offset = consumer.committed(partition).offset() 线上,因为link api 引用committed 方法是获取给定分区的the last committed offset,即:您的消费者告诉kafka 服务器它已经读取的最后一个偏移量。 所以,你肯定会得到messages replayed,因为你总是从特定的偏移量读取。 因为我认为我只需要删除第一个 for 块。

    【讨论】:

    • 当然,我应该检查一下 Consumer.position
    • @ryanpudd,我正在尝试在 spark 程序中实现相同的目标,但无法实现。您能否提供完整代码的链接?谢谢
    • 如果我已经消费了主题分区中的最后一条消息,seekToEnd() 不会将指针设置为超过最后一条消息吗?如何重新消费我已经消费的最新消息?
    【解决方案2】:

    查看记录数并获取最后一条消息:

        // Poll so we know we're connected
        consumer.poll(100);
        // Get the assigned partitions
        Set<TopicPartition> assignedPartitions = consumer.assignment();
        // Seek to the end of those partitions
        consumer.seekToEnd(assignedPartitions);
    
        for (TopicPartition partition : assignedPartitions) {
            final long offset = consumer.committed(partition).offset();
            // Seek to the previous message
            consumer.seek(partition, offset - 1);
        }
    
        // Now get the last message
        ConsumerRecords<String, String> records = consumer.poll(100);
        int size = records.count();
        int index = 0;
        for (ConsumerRecord<String, String> record : records) {
            index = index + 1;
            if (index == size) {
                String value = record.value();
                System.out.println("Last Message = " + value);
            }
        }
        consumer.close();
    

    【讨论】:

    • 在最终的长偏移处获得 nullPointerException = kafkaConsumer.committed(partition).offset();
    • 这在大多数情况下都可以正常工作。然而,在exactly-once语义中;它不会工作。
    猜你喜欢
    • 2019-05-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-21
    • 2017-06-12
    相关资源
    最近更新 更多