【问题标题】:How to commit offsets thread safe using camel-kafka?如何使用骆驼卡夫卡提交偏移线程安全?
【发布时间】:2019-06-22 15:40:39
【问题描述】:

正如问题How to manually control the offset commit with camel-kafka? 所问的那样,我想使用camel-kafka 手动提交偏移量。我的路线:

.from(kafka:topic1)
 .aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
 .process(new ManualCommitProcessor())

,其中ManualCommitProcessor将消息发送到另一个主题后进行承诺。

问题是聚合器和 kafka 生产者在负责偏移承诺的 kafka 消费者的单独线程中工作。因此,我以

结尾
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

是否有可能在聚合和分派提交偏移后再次调用消费者线程?

【问题讨论】:

    标签: apache-kafka apache-camel


    【解决方案1】:

    不,这是不可能的,消费者线程独立于聚合器的输出运行。

    【讨论】:

    • Ok.@Claus Ibsen:我需要聚合器,因为我发现生产者不会在没有它的情况下批量记录。但是,使用自动提交存在丢失数据的风险(这对我们的用例来说是一个真正的问题),因为提交是在聚合真正发送到另一个主题之前完成的。有没有其他方法可以解决这个问题?
    • @Claus Ibsen:camel-kafka 线程不安全?我观察到在我的路线中使用线程时。几个小时后,我的路线再次消耗了相同的消息。我已经自动提交到 true & producer requestRequiredAcks=0
    猜你喜欢
    • 2019-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-22
    • 1970-01-01
    • 2015-11-22
    • 1970-01-01
    • 2019-12-15
    相关资源
    最近更新 更多