【问题标题】:How to achieve Exactly Once semantics when archive kafka message into AWS S3?将kafka消息归档到AWS S3时如何实现Exactly Once语义?
【发布时间】:2016-04-14 07:58:30
【问题描述】:

如何在一个 S3 PutObject 事务中存储带有分区偏移量的 kafka 消息数据以实现 Exactly Once 语义?可以吗?

【问题讨论】:

    标签: amazon-s3 apache-kafka offset


    【解决方案1】:

    是的,应该可以。一种方法是控制偏移管理。

    您的消费者可以一次从 Kafka 读取一条消息并将其作为对象放入 AWS,同时将该存储偏移量 + 分区名称作为 AWS 中的键。现在假设您的客户端崩溃了。当它下次出现时,您查询 S3 以查找 S3 中的最后一个偏移量并从那里开始读取消息。为了在将消息放入 S3 之前进行额外保护,请检查具有该密钥的对象(如果您的生产者为消息生成 UUID 并且您可以使用它会更好)如果是,则不要覆盖它而是跳过消息。

    kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
            while(topicPartitionIterator.hasNext()){
                  TopicPartition topicPartition = topicPartitionIterator.next();
                  System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is kafkaConsumer.committed(topicPartition) 
                  System.out.println("Resetting offset to " + startingOffset);
                  kafkaConsumer.seek(topicPartition, startingOffset);
               }
            }
          }
      });
    

    希望有帮助

    【讨论】:

    • 是的,你的想法是可能的。但是s3中可能有很多offset+partition key,查询最后一个offset会越来越慢。除了使用一些 mem db 之外,检查 s3 中存在的密钥也不容易。此外,我们需要在日期时间分组 s3 数据,以便在需要时恢复特定日期范围的数据。那么如何设计s3 key呢?
    猜你喜欢
    • 2020-08-20
    • 2019-08-25
    • 2019-12-10
    • 1970-01-01
    • 1970-01-01
    • 2019-08-25
    • 2017-03-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多