【发布时间】:2021-01-28 20:56:24
【问题描述】:
我正在使用CppKafka 对 Kafka 消费者进行编程。我希望当我的消费者启动时,它只会轮询新到达的消息(即消息在消费者开始时间之后到达)而不是消费者偏移量的消息。
// Construct the configuration
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" },
{ "group.id", "1" },
// Disable auto commit
{ "enable.auto.commit", false },
// Set offest to latest to receive latest message when consumer start working
{ "auto.offset.reset", "latest" },
};
// Create the consumer
Consumer consumer(config);
consumer.set_assignment_callback([](TopicPartitionList& partitions) {
cout << "Got assigned: " << partitions << endl;
});
// Print the revoked partitions on revocation
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
cout << "Got revoked: " << partitions << endl;
});
string topic_name = "test_topic";
// Subscribe to the topic
consumer.subscribe({ topic_name });
据我了解,配置auto.offset.reset 设置为latest 仅在消费者开始读取分配的分区时没有提交偏移量的情况下才有效。所以我在这里的猜测是我应该打电话给consumer.poll()而不提交,但感觉不对,我担心我会在此过程中破坏一些东西。谁能告诉我实现我的要求的正确方法?
【问题讨论】:
标签: c++ apache-kafka