【问题标题】:Change listening topic during runtime in Spring Kafka在 Spring Kafka 运行时更改监听主题
【发布时间】:2021-04-26 00:00:25
【问题描述】:

我有一个前端,可以触发将我的 Kafka 主题更改为另一个主题。当我这样做时,Java Springboot 后端也应该更改收听该新主题以使用传入消息。问题是这必须在运行时期间发生。因此@KafkaListener 不是一个选项,因为它至少在启动时需要主题名称。
我将新主题作为 UUID 字符串传递给下面显示的方法。这是许多尝试之一,它不会识别新 uuid 主题中的任何消息(即使有消息)。新主题和消息由另一个服务生成(这部分工作正常)。我从另一个对我没有帮助的问题中得到了这个例子:Spring Kafka - Subscribe new topics during runtime 我还阅读了:How to create separate Kafka listener for each topic dynamically in springboot?
尽管如此,在应用程序启动和第一次调用 changeListener 方法期间,我在控制台中得到了这条日志记录行:

INFO 9636 --- [main] o.a.k.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-group-1, groupId=group] Subscribed to topic(s): 09574388-e8e1-4cef-8e67-881f69850f8f

目标是每次在Kafka的新主题中有消息时,使用// do other stuff with message调用MessageListener的方法。
是否有可能在运行时更改主题以及如何更改?
如果您需要更多信息,请随时询问。

  public void changeListener(String uuid) {
    ContainerProperties containerProps = new ContainerProperties(uuid);
    containerProps.setMessageListener(
        (MessageListener<UUID, String>) message -> {
          LOG.info("received: " + message);
          // do other stuff with message
        }
    );
    KafkaMessageListenerContainer<UUID, String> container =
        new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory<>(consumerProps()), containerProps);
    container.start();
  }

  private Map<String, Object> consumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8069");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
  }

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    据我所知,浏览源代码时,您无法在运行时更改主题。因此,您需要停止当前容器并重新创建一个新容器。

    我建议不要在这种情况下使用注册表并自己管理容器,因为您似乎无法从注册表中删除容器并且最终会导致内存泄漏。

    您可以自动接线KafkaListenerContainerFactory。这个工厂需要一个端点。我必须承认,如果您只想更改主题并调用回调,那么设置端点对我来说似乎有点痛苦,因为所有可用的实现都使用带有 bean 和方法引用的元编程。

    以下片段应该可以帮助您入门,尽管它可能需要更多调整。

    @SpringBootApplication
    @EnableKafka
    public class KafkaDemoApplication {
        private KafkaListenerContainerFactory<?> factory;
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaDemoApplication.class, args);
        }
    
    
        @Autowired
        public void setFactory(KafkaListenerContainerFactory<?> factory) {
            this.factory = factory;
        }
    
        @EventListener(classes = {ApplicationStartedEvent.class})
        public void onStarted() throws InterruptedException, NoSuchMethodException {
            var listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_3"));
            registry.stop();
            listenerContainer.start();
            Thread.sleep(2000);
            listenerContainer.stop();
    
            listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_4"));
            listenerContainer.start();
            Thread.sleep(2000);
            listenerContainer.stop();
        }
    
        private KafkaListenerEndpoint getEndpoint(String topic) throws NoSuchMethodException {
            var listenerEndpoint = new MethodKafkaListenerEndpoint<String, String>();
            listenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
            listenerEndpoint.setBean(this);
            listenerEndpoint.setMethod(getClass().getMethod("onMessage", String.class, String.class));
            listenerEndpoint.setTopics(topic);
    
            return listenerEndpoint;
        }
    
        public void onMessage(String key, String value) {
            System.out.println(key + ":" + value)
        }
    }
    

    附带说明,如果您想访问注册表,您可以实现 KafkaListenerConfigurer,因为它不是自动装配的。但同样,如果您想杀死您的容器,请不要使用它,因为据我所知,您无法删除引用。

    【讨论】:

    • 通过这种尝试让它现在运行。创建一个新的 ListenerEndpoint 并将其用于每个新主题,然后在 facrory 中创建一个新容器就可以了。如果有人需要我的代码示例,现在就告诉我,我可以发布详细信息。
    【解决方案2】:

    像这样手动创建容器是个坏主意,因为它需要由 spring 初始化;您将无法获得完整的功能。

    如果您使用的是弹簧靴;使用它的ConcurrentMessageListenerContainerFactory 创建一个容器。

    如果你不使用引导,添加你自己的ConcurrentMessageListenerContainerFactory@Bean

    对多个侦听器容器使用相同的group.id 也不是一个好主意,因为对一个容器进行重新平衡会导致对其他容器进行不必要的重新平衡。

    为了读取新主题中的现有记录,您必须设置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="earliest:"(默认为latest)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-02-22
      • 2016-12-30
      • 2022-01-03
      • 1970-01-01
      • 1970-01-01
      • 2019-10-24
      • 2015-09-08
      相关资源
      最近更新 更多