【问题标题】:How to Invoke spring kafkaListener method manually如何手动调用spring kafkaListener方法
【发布时间】:2022-01-15 11:09:19
【问题描述】:

我创建了一个 java 类和方法有 @KafkaListener。但我希望只有当我从 UI 中单击按钮时才需要调用此方法。当我从 UI 中单击一个按钮时,调用会转到控制器类,而我想从控制器类调用这个类。怎么能做到这一点? 我没有添加任何类级别的注释。

public class MessageListener implements ConsumerSeekAware {

   
    private String topicName;
     
     private int partition;
     
     private long beginOffset;
     
     private long endOffset;
 
  
    public MessageListener(String topicName, int partition, long beginOffset, long endOffset) {
        super();
        this.topicName = topicName;
        this.partition = partition;
        this.beginOffset = beginOffset;
        this.endOffset = endOffset;
    }
 

    @KafkaListener(topics = "topicName", 
    concurrency = "20",
    clientIdPrefix = "clientId-Test",
    groupId = "clientId-group")
    public void handleMessage(final ConsumerRecord<Object, Object> consumerRecord) {
        System.out.println(consumerRecord);
    }

}

选项 2:

       public List<String> searchMessages(String topicName, int partitionNo, long beginOffset, long endOffset) {

    List<String> filteredMessages = new ArrayList<>();
    TopicPartition tp = new TopicPartition("topicName", partitionNo);
    Properties clusterOneProps = kafkaConsumerConfig.getConsumerProperties();
    KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(clusterOneProps);

    try {
        consumer.subscribe(Collections.singletonList("topicName"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // TODO Auto-generated method stub
        }
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // TODO Auto-generated method stub
            
            consumer.seek(tp, beginOffset); 
        }
    }); 
    Thread.sleep(100);    
    boolean flag = true;
    System.out.println("search started......from offset is "+beginOffset);
    while(flag) {   
    ConsumerRecords<String, Object> crs = consumer.poll(Duration.ofMillis(100L));
     for (ConsumerRecord<String, Object> record : crs) {
              // search criteria
               if(record.value().toString().contains("01111") && record.value().toString().contains("2021-11-06")) {
                   System.out.println("founddddddddddddddddddddddddddddddddddddddd "+record.offset());
                   filteredMessages.add(record.value().toString());
               }
               if (record.offset() == endOffset) {
                   flag = false;
                   break;
               }
        }
     }
     System.out.println("doneeeeeeeeeeeeeeeee");
    }catch(Exception e) {
        e.printStackTrace();
    }finally {
        consumer.close();
    }

【问题讨论】:

  • 只要把这个MessageListener 当作一个bean。将其注入您的控制器并从您的 MVC 端调用该方法!否则,不清楚您是否仍想访问 Kafka。请详细说明您的真正目标是什么。 @KafkaListener 确实打算在从主题中提取记录时由 Kafka 侦听器容器调用。手动调用这个方法是什么原因,目前还不清楚……
  • @ArtemBilan - 我的目标是“我有一个包含 20 个分区的主题并且有员工数据。从 UI 搜索屏幕中,我将传递多个员工编号,现在我想搜索所有这些分区以找到一个没有特定的员工数据。如果匹配,那么我想放入一个单独的列表并作为文件下载。我提前知道每个分区的 beginOffset 和 endOffest。并寻找我可以执行此操作的效率/速度。跨度>

标签: spring-kafka


【解决方案1】:

现在我想搜索所有这些分区以查找特定员工的数据是否存在。

那么@KafkaListener 不适合你。当请求完成时,看起来您的要求是按需的。 @KafkaListener 本身就是一个东西:它依赖于下面的不断轮询循环。它完全不依赖于最终用户的请求。

您需要考虑根据您在 WEB 请求中收到的数据即时创建KafkaConsumer。请参阅ConsumerFactory,而不是直接使用它来创建这些消费者。然后您可以在该消费者上调用poll() 并最终关闭它。这已经超出了 Apache Kafka 的 Spring 范围:https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting

或者...您可以更新到最新的spring-kafka-2.8.0 并使用KafkaTemplate.receive() API:https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-template-receive

【讨论】:

  • 我尝试创建 KafkaConsumer 并通过查找 beginOffset 来轮询记录。但由于 KafkaConsumer 不是线程安全的,我一次只能运行一个分区。所以要完成 20 个分区需要更长的时间。请在问题(选项 2)中查看我的逻辑。如何以更快的方式实现这一目标。如何使用 KafkaConsumer 实现多线程?
  • 你不能。 KafkaConsumer 仅用于单线程。看起来你所说的不是为 Kafka 作为商店而设计的。考虑在两者之间有一些数据库。 Kafka 是消息传递代理,无法通过简单的方式对每个请求进行搜索。
猜你喜欢
  • 1970-01-01
  • 2019-09-16
  • 1970-01-01
  • 1970-01-01
  • 2019-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-09-05
相关资源
最近更新 更多