【发布时间】:2021-08-06 13:22:54
【问题描述】:
我正在使用Spring-Kakfa连接kakfa集群,我需要在分配所有主题的所有主题分区后执行一段代码。 以下面的代码为例
@Component
public class MyKafkaMessageListener1 {
@KafkaListener(topics = "topic1" , groupId = "cg1")
public void handleMessage(String message){
System.out.println("Msg:"+message);
}
}
@Component
public class MyKafkaMessageListener2 {
@KafkaListener(topics = "topic2, topic3" , groupId = "cg2")
public void handleMessage(String message){
System.out.println("Msg:"+message);
}
}
我需要在 topic1、topic2 和 topic3 的所有主题分区分配给线程后执行。是否可以? kafka 或者 spring-kafka 中有这样的 EventHandler 吗?
【问题讨论】:
-
您可以通过构造函数/
@AutoWired字段将回调接口注入到两个侦听器类中,该字段在handleMessage上触发一些事件,然后跟踪正在调用哪个侦听器/线程,但这不会保证topic2,topic3都被消费了,只是你从其中一个那里得到了一些消息 -
我不明白该方法是如何工作的,handle 方法中的自定义事件如何解决这个问题?我也在寻找一种在分配所有主题的所有分区后通知的方法
-
在
handleMessage中,您调用其他方法(您需要传递给每个侦听器类的对象)来“通知”它已被调用(因此线程具有已激活)。如何“跟踪”哪个主题/线程是哪个,或者线程是否停止,是一个单独的问题 -
感谢您的澄清。这不适用于我的用例,在分配所有主题的所有分区之后,我只想收到一次通知。
-
当然,我的建议是分两步解决。您需要单独跟踪所有线程,一旦您收集了所有线程的消息,然后发送您真正想要的事件
标签: java apache-kafka spring-kafka