【发布时间】:2018-11-06 14:27:22
【问题描述】:
我正在实现从 Kafka 获取消息的 dsl spring 集成流程
代码sn-p:
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
kafkaPropertiesConfiguration.getTelemetryDataTopic()))
})
.handle(bla.someImportantOperation())
//TODO:do manual commit here
//.handle(consumer.commitSync())
.get();
我想知道如何手动提交同步,但只有在 .handle(bla.someImportantOperation()) 成功完成之后。
由于我使用的是 DefaultKafkaConsumerFactory,我不知道如何获取消费者参考,希望能提供任何帮助。
这些是我用来创建消费者的消费者属性:
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
【问题讨论】:
标签: java spring apache-kafka spring-integration spring-integration-dsl