【问题标题】:How to create separate Kafka listener for each topic dynamically in springboot?如何在spring boot中为每个主题动态创建单独的Kafka监听器?
【发布时间】:2017-05-22 20:25:02
【问题描述】:

我是 Spring 和 Kafka 的新手。我正在研究一个用例 [使用 SpringBoot-kafka],其中允许用户在运行时创建 kafka 主题。 Spring 应用程序应在运行时以编程方式订阅这些主题。到目前为止我所知道的是,Kafka 侦听器是设计时间,因此需要在启动之前指定主题。 SpringBoot-Kafka集成有没有办法动态订阅kafka主题?

推荐这个 https://github.com/spring-projects/spring-kafka/issues/132

我计划实施的当前方法是,不要使用 Spring-Kafka 集成,而是自己实施 Kafka 消费者 [使用 java 代码],如此处所述 spring boot kafka consumer - how to properly consume kafka messages from spring boot

【问题讨论】:

    标签: spring spring-mvc spring-boot kafka-consumer-api spring-kafka


    【解决方案1】:

    如果您想使用注释指定Kafka 侦听器,则它们只是“设计时”。 Spring-kafka 也允许您动态创建它们,请参阅KafkaMessageListenerContainer

    动态创建的 Kafka 侦听器的最简单示例是:

    Map<String, Object> consumerConfig = ImmutableMap.of(
        BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
        GROUP_ID_CONFIG, "groupId"
    );
    
    DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
            new DefaultKafkaConsumerFactory<>(
                    consumerConfig,
                    new StringDeserializer(),
                    new StringDeserializer());
    
    ContainerProperties containerProperties = new ContainerProperties("topicName");
    containerProperties.setMessageListener((MessageListener<String, String>) record -> {
         //do something with received record
    } 
    
    ConcurrentMessageListenerContainer container =
            new ConcurrentMessageListenerContainer<>(
                    kafkaConsumerFactory,
                    containerProperties);
    
    container.start();
    

    更多解释和代码见这篇博文:http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/

    【讨论】:

    • 在这个实现中,我们可以批量消费消息吗?
    猜你喜欢
    • 1970-01-01
    • 2020-03-11
    • 2019-02-22
    • 1970-01-01
    • 2018-03-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-11
    相关资源
    最近更新 更多