【发布时间】:2019-06-22 15:40:39
【问题描述】:
正如问题How to manually control the offset commit with camel-kafka? 所问的那样,我想使用camel-kafka 手动提交偏移量。我的路线:
.from(kafka:topic1)
.aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
.process(new ManualCommitProcessor())
,其中ManualCommitProcessor将消息发送到另一个主题后进行承诺。
问题是聚合器和 kafka 生产者在负责偏移承诺的 kafka 消费者的单独线程中工作。因此,我以
结尾java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
是否有可能在聚合和分派提交偏移后再次调用消费者线程?
【问题讨论】: