【问题标题】:Simple-Kafka-consumer message delivery duplicationSimple-Kafka-consumer 消息传递复制
【发布时间】:2013-04-24 03:58:05
【问题描述】:

我正在尝试用 Java 实现一个简单的 Producer-->Kafka-->Consumer 应用程序。我能够成功地生产和消费消息,但是当我重新启动消费者时会出现问题,其中一些已经消费的消息再次被消费者从 Kafka 中获取(不是所有消息,而是最后的一些消息)消费消息)。

我在消费者中设置了autooffset.reset=largest,并且我的autocommit.interval.ms 属性设置为1000 毫秒。

“重新传递一些已使用的消息”是一个已知问题,还是我在这里遗漏了任何其他设置?

基本上,有没有办法确保之前消费的消息都不会被消费者拾取/消费?

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    Kafka 使用 Zookeeper 来存储消费者偏移量。由于 Zookeeper 的操作非常缓慢,因此不建议在消费完每条消息后提交 offset。

    可以向消费者添加关闭钩子,在退出前手动提交主题偏移量。但是,这在某些情况下无济于事(例如 jvm 崩溃或kill -9)。为了防止这种情况,我建议实现自定义提交逻辑,该逻辑将在处理每条消息(文件或本地数据库)后在本地提交偏移量,并且每 1000 毫秒向 Zookeeper 提交偏移量。在消费者启动时,这两个位置都应该被查询,并且最多应该使用两个值作为消费偏移量。

    【讨论】:

    • 这听起来可能很愚蠢,但如果我们说例如实现自定义提交逻辑,那么是否可以管理每条消息的偏移量。例如,如果我有两条带有时间戳值的消息,那么我想根据时间戳设置偏移量。因此,如果第二条记录的时间戳较早,那么分配给它的偏移量应该小于另一个。所以在消费时我会收到已经排序的消息。
    猜你喜欢
    • 2019-11-20
    • 2015-02-03
    • 1970-01-01
    • 2020-03-08
    • 2019-12-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多