【问题标题】:Integrating Spring Boot and Reactor-Kafka's KafkaReceiver集成 Spring Boot 和 Reactor-Kafka 的 KafkaReceiver
【发布时间】:2019-08-10 01:45:00
【问题描述】:

我正在尝试使用库 reactor-kafka 开发 Spring Boot 应用程序,以响应从 Kafka 主题读取的一些消息。

我有一个构建 KafkaReceiver 的配置类。

@Configuration
public class MyConfiguration {

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        Map<String, Object> props = new HashMap<>();
        // Options initialisation...
        final ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, string>create(props)
                               .subscription(Collections.singleton(consumer.getTopic()));
        return KafkaReceiver.create(receiverOptions);
    } 
}

嗯……现在呢?使用非反应性 spring-kafka 库,我可以用 @KafkaListener 注释一个方法,Spring Boot 将为我创建一个从 Kafka 主题监听的线程。

我应该把KafkaReceiver 放在哪里呢?在所有示例中,我发现直接使用main 方法,但这不是引导方式

我正在使用 Spring Boot 2.1.3 和 Reactor-Kafka 1.1.0

提前致谢。

【问题讨论】:

    标签: java spring spring-boot project-reactor spring-kafka


    【解决方案1】:

    既然你有KafkaReceiver bean,现在你可以这样做:

    @Bean
    public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
            return args -> {
                    kafkaReceiver.receive()
                              ...
                              .sunbscribe();
            };
    }
    

    ApplicationContext 准备好时,这个ApplicationRunner bean 将被踢出。有关更多信息,请参阅其 JavaDocs。

    【讨论】:

    • 非常感谢。也许,您可以在文档中的某处添加此集成 :)
    • 如何运行同一个消费组的多个消费者?
    • 您介意提出一个包含更多详细信息的新 SO 线程吗?看起来和这个旧的没有关系...
    猜你喜欢
    • 1970-01-01
    • 2021-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多