【问题标题】:Kafka consumer manual commit offsetKafka 消费者手动提交偏移量
【发布时间】:2018-11-06 14:27:22
【问题描述】:

我正在实现从 Kafka 获取消息的 dsl spring 集成流程

代码sn-p:

return IntegrationFlows.from(
                Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
                        kafkaPropertiesConfiguration.getTelemetryDataTopic()))
                })
                .handle(bla.someImportantOperation())
                //TODO:do manual commit here
                //.handle(consumer.commitSync())

                .get();

我想知道如何手动提交同步,但只有在 .handle(bla.someImportantOperation()) 成功完成之后。

由于我使用的是 DefaultKafkaConsumerFactory,我不知道如何获取消费者参考,希望能提供任何帮助。

这些是我用来创建消费者的消费者属性:

consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());

consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

【问题讨论】:

    标签: java spring apache-kafka spring-integration spring-integration-dsl


    【解决方案1】:

    Kafka.messageDrivenChannelAdapter() 为您提供了一个配置器钩子:

    .configureListenerContainer(c ->
                                    c.ackMode(ContainerProperties.AckMode.MANUAL))
    

    注意我提供的选项。

    阅读其 Javadocs,然后阅读 AcknowledgingMessageListener。 提到了Acknowledgment。这个通过KafkaHeaders.ACKNOWLEDGMENT 出现在邮件标题中。

    所以,您在//.handle(consumer.commitSync()) 中所需要的就是这样的:

    .handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())
    

    在 Spring 中查看更多关于 Apache Kafka 文档的信息:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#committing-offsets

    【讨论】:

      猜你喜欢
      • 2022-11-13
      • 2018-11-23
      • 2017-08-22
      • 1970-01-01
      • 2021-11-10
      • 2020-08-08
      • 2022-11-11
      • 2020-03-05
      • 2018-02-03
      相关资源
      最近更新 更多