【发布时间】:2018-04-20 06:21:26
【问题描述】:
我是 Kafka 的新手,正在研究将专有流媒体服务连接到 Kafka 的原型。
我正在寻找在某个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要使用它在连接时收到的最后一条消息的 ID 登录。
是否可以使用 KafkaProducer 或 KafkaConsumer 来执行此操作?
我尝试使用消费者执行以下操作,但在运行控制台消费者时,我看到消息重播。
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
这是预期的行为还是我走错了路?
【问题讨论】:
标签: apache-kafka kafka-consumer-api kafka-producer-api