【问题标题】:How to commit consumer offsets using Camel-kafka?如何使用 Camel-kafka 提交消费者补偿?
【发布时间】:2015-09-21 10:18:11
【问题描述】:

我正在使用 apache camel 来整合我的 kafka 消息传递。另外我正在使用 JAVA DSL 来使用来自 kafka 端点的消息。

使用 apache kafka API 知道如何通过给定属性切换来提交消费者偏移量。

如果我在 camel-kafka 组件中禁用自动提交,那么我如何在 apcahe camel 中提交偏移量?

我正在使用下面的端点来禁用 Apache Camel 中的自动提交

kafka.consumer.uri = kafka://{{kafka.host}}?zookeeperHost={{zookeeper.host}}&zookeeperPort={{zookeeper.port}}&groupId={{kafka.consumerGroupId}}&topic={{kafka.topic.transformer_t}}&{{kafka.uri.options}}
kafka.uri.options = autoCommitEnable=false

【问题讨论】:

  • 您找到解决方案了吗?
  • 不是真的,但如果有的话,请查看 Camel-Kafka 中的最新规格。我停止使用 Camel-Kafka,因为不支持手动提交 Offset。
  • 这对我来说也是一场表演。另见:stackoverflow.com/questions/45947180/…

标签: java apache-camel


【解决方案1】:

CAMEL-11933 中添加了对手动偏移提交的支持,并且从版本 2.21.0 开始支持。您可以使用选项allowManualCommit=true 启用它。此选项将公开标头CamelKafkaManualCommit,包含KafkaManualCommit 的实例,然后您可以使用它,例如在Processor.

from("kafka:topic?groupId=group&autoCommitEnable=false&allowManualCommit=true")
  //...
  .process(exchange -> {
     exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class).commitSync();
  });

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-01-31
    • 2016-12-09
    • 2020-08-24
    • 1970-01-01
    • 1970-01-01
    • 2016-10-10
    • 2017-03-19
    • 2022-11-13
    相关资源
    最近更新 更多