【发布时间】:2021-04-26 00:00:25
【问题描述】:
我有一个前端,可以触发将我的 Kafka 主题更改为另一个主题。当我这样做时,Java Springboot 后端也应该更改收听该新主题以使用传入消息。问题是这必须在运行时期间发生。因此@KafkaListener 不是一个选项,因为它至少在启动时需要主题名称。
我将新主题作为 UUID 字符串传递给下面显示的方法。这是许多尝试之一,它不会识别新 uuid 主题中的任何消息(即使有消息)。新主题和消息由另一个服务生成(这部分工作正常)。我从另一个对我没有帮助的问题中得到了这个例子:Spring Kafka - Subscribe new topics during runtime 我还阅读了:How to create separate Kafka listener for each topic dynamically in springboot?
尽管如此,在应用程序启动和第一次调用 changeListener 方法期间,我在控制台中得到了这条日志记录行:
INFO 9636 --- [main] o.a.k.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-group-1, groupId=group] Subscribed to topic(s): 09574388-e8e1-4cef-8e67-881f69850f8f
目标是每次在Kafka的新主题中有消息时,使用// do other stuff with message调用MessageListener的方法。
是否有可能在运行时更改主题以及如何更改?
如果您需要更多信息,请随时询问。
public void changeListener(String uuid) {
ContainerProperties containerProps = new ContainerProperties(uuid);
containerProps.setMessageListener(
(MessageListener<UUID, String>) message -> {
LOG.info("received: " + message);
// do other stuff with message
}
);
KafkaMessageListenerContainer<UUID, String> container =
new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory<>(consumerProps()), containerProps);
container.start();
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8069");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
【问题讨论】:
标签: spring-kafka