【发布时间】:2016-04-14 07:58:30
【问题描述】:
如何在一个 S3 PutObject 事务中存储带有分区偏移量的 kafka 消息数据以实现 Exactly Once 语义?可以吗?
【问题讨论】:
标签: amazon-s3 apache-kafka offset
如何在一个 S3 PutObject 事务中存储带有分区偏移量的 kafka 消息数据以实现 Exactly Once 语义?可以吗?
【问题讨论】:
标签: amazon-s3 apache-kafka offset
是的,应该可以。一种方法是控制偏移管理。
您的消费者可以一次从 Kafka 读取一条消息并将其作为对象放入 AWS,同时将该存储偏移量 + 分区名称作为 AWS 中的键。现在假设您的客户端崩溃了。当它下次出现时,您查询 S3 以查找 S3 中的最后一个偏移量并从那里开始读取消息。为了在将消息放入 S3 之前进行额外保护,请检查具有该密钥的对象(如果您的生产者为消息生成 UUID 并且您可以使用它会更好)如果是,则不要覆盖它而是跳过消息。
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
while(topicPartitionIterator.hasNext()){
TopicPartition topicPartition = topicPartitionIterator.next();
System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is kafkaConsumer.committed(topicPartition)
System.out.println("Resetting offset to " + startingOffset);
kafkaConsumer.seek(topicPartition, startingOffset);
}
}
}
});
希望有帮助
【讨论】: