【问题标题】:Spring Kafka Event after all listeners are started启动所有侦听器后的 Spring Kafka 事件
【发布时间】:2021-08-06 13:22:54
【问题描述】:

我正在使用Spring-Kakfa连接kakfa集群,我需要在分配所有主题的所有主题分区后执行一段代码。 以下面的代码为例

     @Component
     public class MyKafkaMessageListener1 {
    
         @KafkaListener(topics = "topic1" , groupId = "cg1")
         public void handleMessage(String message){
               System.out.println("Msg:"+message);
         }
     }

     @Component
     public class MyKafkaMessageListener2 {
    
         @KafkaListener(topics = "topic2, topic3" , groupId = "cg2")
         public void handleMessage(String message){
               System.out.println("Msg:"+message);
         }
     }

我需要在 topic1、topic2 和 topic3 的所有主题分区分配给线程后执行。是否可以? kafka 或者 spring-kafka 中有这样的 EventHandler 吗?

【问题讨论】:

  • 您可以通过构造函数/@AutoWired 字段将回调接口注入到两个侦听器类中,该字段在handleMessage 上触发一些事件,然后跟踪正在调用哪个侦听器/线程,但这不会保证topic2,topic3 都被消费了,只是你从其中一个那里得到了一些消息
  • 我不明白该方法是如何工作的,handle 方法中的自定义事件如何解决这个问题?我也在寻找一种在分配所有主题的所有分区后通知的方法
  • handleMessage 中,您调用其他方法(您需要传递给每个侦听器类的对象)来“通知”它已被调用(因此线程具有已激活)。如何“跟踪”哪个主题/线程是哪个,或者线程是否停止,是一个单独的问题
  • 感谢您的澄清。这不适用于我的用例,在分配所有主题的所有分区之后,我只想收到一次通知。
  • 当然,我的建议是分两步解决。您需要单独跟踪所有线程,一旦您收集了所有线程的消息,然后发送您真正想要的事件

标签: java apache-kafka spring-kafka


【解决方案1】:

实现ConsumerSeekAware(或扩展AbstractConsumerSeekAware),或添加ConsumerRebalanceListener,您将收到对onPartitionsAssigned()的调用。

【讨论】:

  • Gary,我已经尝试过了,但是每次将主题分区分配给线程时都会调用 onPartitionsAssigned(),所以如果一个主题有 20 个分区,它会被调用 20 次,我必须知道该主题具有的分区并记录 onPartitionsAssigned 方法被调用的次数,以及如果提供者增加分区数,我的代码将中断。此外,如果我在不同的类上有 @KafkaListener 注释,如示例中所示,它也不起作用?
  • 是的,它将与多个侦听器一起使用,您只需将状态保持在一个公共类中。您可以使用AdminClient 获取有关主题的信息(分区数)。 describeTopics().
  • 是的,这会起作用,但问题是我们正在使用带有 Kafka API 的 Azure 事件中心,当我们创建 SAS 密钥时,它只能选择从主题中读取,没有使用权限带有该 SAS 密钥的 AdminClient API :(
猜你喜欢
  • 1970-01-01
  • 2012-04-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-29
  • 2017-09-01
相关资源
最近更新 更多