【问题标题】:Kafka Spring: How to create Listeners dynamically or in a loop?Kafka Spring:如何动态或循环创建监听器?
【发布时间】:2020-07-17 21:29:08
【问题描述】:

我有 4 个 ConsumerFactory 侦听器正在读取 4 个不同的主题,如下所示:

@KafkaListener(
      id = "test1",
      topicPattern  = "test.topic1",
      groupId = "pp-test1")
  public void listenTopic1(ConsumerRecord<String, String> record) {
    System.out.println("Topic is: " + record.topic());   
  }

但是我们将有 50 个主题,我想设置至少 25 个听众以获得更好的性能。我怎样才能动态地做到这一点,而不是手动编写 25 个侦听器?

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    您不能以编程方式创建@KafkaListeners,只能创建离散的侦听器容器(带有自定义侦听器)。

    您可以通过以编程方式为每个侦听器创建一个子应用程序上下文来做到这一点。

    编辑

    @SpringBootApplication
    public class So53715268Application {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(So53715268Application.class, args);
            for (int i = 0; i < 2; i++) {
                AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
                child.setParent(context);
                child.register(ChildConfig.class);
                Properties props = new Properties();
                props.setProperty("group", "group." + i);
                props.setProperty("topic", "topic" + i);
                PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
                child.getEnvironment().getPropertySources().addLast(pps);
                child.refresh();
            }
        }
    
    }
    

    @Configuration
    @EnableKafka
    public class ChildConfig {
    
        @Bean
        public Listener listener() {
            return new Listener();
        }
    
    }
    

    public class Listener {
    
        @KafkaListener(id = "${group}", topics = "${topic}")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    : partitions assigned: [topic0-0]
    : partitions assigned: [topic1-0]
    

    请注意,如果您使用的是 Spring Boot,则子配置类和侦听器必须与主应用程序位于不同的包中(而不是子包)。

    【讨论】:

    • 谢谢。你有可以做的示例代码吗?
    • 你好,你写的代码有问题。如果我复制它,它会告诉我 addLast() 不适用于 PropertiesPropertySource。我应该如何改变它?
    • 你一定有问题;该代码来自工作中的 Spring Boot 应用程序。 child.getEnvironment().getPropertySources() 返回一个 MutablePropertySources,它确实有 addLast()
    【解决方案2】:

    假设您的侦听器方法是名为侦听器的 Springboot 应用程序的一部分。从配置应用 yml 中读取主题名称:

    @KafkaListener(
      topics = "${ai.kafka.consumer.topic-name}")
    public void listenTopic1(ConsumerRecord<String, String> record) {
    System.out.println("Topic is: " + record.topic());   
    }
    

    然后部署任意数量的 Listener,每个 Listener 在 application.yml 中都有不同的 topic-name 值。 (如果您使用的是 Docker 容器,那就更容易了)。

    【讨论】:

      猜你喜欢
      • 2017-12-01
      • 2014-10-13
      • 2017-05-22
      • 1970-01-01
      • 1970-01-01
      • 2022-01-03
      • 1970-01-01
      • 1970-01-01
      • 2020-10-03
      相关资源
      最近更新 更多