【问题标题】:Kafka consumer poll newest messageKafka消费者投票最新消息
【发布时间】:2021-01-28 20:56:24
【问题描述】:

我正在使用CppKafka 对 Kafka 消费者进行编程。我希望当我的消费者启动时,它只会轮询新到达的消息(即消息在消费者开始时间之后到达)而不是消费者偏移量的消息。

// Construct the configuration
Configuration config = {
    { "metadata.broker.list", "127.0.0.1:9092" },
    { "group.id", "1" },
    // Disable auto commit
    { "enable.auto.commit", false },
    // Set offest to latest to receive latest message when consumer start working
    { "auto.offset.reset", "latest" },
};

// Create the consumer
Consumer consumer(config);

consumer.set_assignment_callback([](TopicPartitionList& partitions) {
    cout << "Got assigned: " << partitions << endl;
});

// Print the revoked partitions on revocation
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
    cout << "Got revoked: " << partitions << endl;
});


string topic_name = "test_topic";
// Subscribe to the topic
consumer.subscribe({ topic_name });

据我了解,配置auto.offset.reset 设置为latest 仅在消费者开始读取分配的分区时没有提交偏移量的情况下才有效。所以我在这里的猜测是我应该打电话给consumer.poll()而不提交,但感觉不对,我担心我会在此过程中破坏一些东西。谁能告诉我实现我的要求的正确方法?

【问题讨论】:

    标签: c++ apache-kafka


    【解决方案1】:

    如果“enable.auto.commit”设置为 false 并且您没有在代码中提交偏移量,那么每次您的消费者启动时,如果 auto.offset.reset=earliest,它就会从主题中的第一条消息开始消费.

    auto.offset.reset 的默认值为“latest”,这意味着如果缺少有效的偏移量,消费者将从最新的记录(消费者开始运行后写入的记录)开始读取。

    根据您上面的问题,auto.offset.reset=latest 应该可以解决您的问题。

    但是,如果您需要基于实时的偏移量,则需要在您的消费者中应用时间过滤器。这意味着从主题中获取消息,将偏移时间与消息负载中的某个自定义字段或消息的元属性(ConsumerRecord.timestamp())进行比较,并相应地进行进一步处理。

    也可以参考这个答案Retrieve Timestamp based data from Kafka

    【讨论】:

    • 如果我的消费者不提交,无论我如何设置auto.offset.reset 配置,它都会从第一条消息开始?
    • 修改了我的答案,使其更加清晰,基本上是“最新的”,意思是缺少有效的偏移量,消费者将从最新的记录(消费者开始运行后写入的记录)开始读取。另一种选择是“最早的”,这意味着如果缺少有效的偏移量,消费者将从头开始读取分区中的所有数据。
    • 非常感谢。我会尝试一些不同的方法(包括你的建议),看看哪种方法效果最好。另一种选择是在分配分区的回调中,我可以手动将该分区中的偏移量设置为分区的末尾。
    【解决方案2】:

    使用 seekToEnd(Collection partitions) 方法。 寻找每个给定分区的最后一个偏移量。此函数延迟计算,仅在调用 poll(long) 时才在所有分区中寻找最终偏移量。如果没有提供分区,则寻找所有当前分配的分区的最终偏移量。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-09-13
      • 1970-01-01
      • 2017-09-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-09
      • 1970-01-01
      相关资源
      最近更新 更多