【发布时间】:2022-01-15 11:09:19
【问题描述】:
我创建了一个 java 类和方法有 @KafkaListener。但我希望只有当我从 UI 中单击按钮时才需要调用此方法。当我从 UI 中单击一个按钮时,调用会转到控制器类,而我想从控制器类调用这个类。怎么能做到这一点? 我没有添加任何类级别的注释。
public class MessageListener implements ConsumerSeekAware {
private String topicName;
private int partition;
private long beginOffset;
private long endOffset;
public MessageListener(String topicName, int partition, long beginOffset, long endOffset) {
super();
this.topicName = topicName;
this.partition = partition;
this.beginOffset = beginOffset;
this.endOffset = endOffset;
}
@KafkaListener(topics = "topicName",
concurrency = "20",
clientIdPrefix = "clientId-Test",
groupId = "clientId-group")
public void handleMessage(final ConsumerRecord<Object, Object> consumerRecord) {
System.out.println(consumerRecord);
}
}
选项 2:
public List<String> searchMessages(String topicName, int partitionNo, long beginOffset, long endOffset) {
List<String> filteredMessages = new ArrayList<>();
TopicPartition tp = new TopicPartition("topicName", partitionNo);
Properties clusterOneProps = kafkaConsumerConfig.getConsumerProperties();
KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(clusterOneProps);
try {
consumer.subscribe(Collections.singletonList("topicName"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO Auto-generated method stub
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// TODO Auto-generated method stub
consumer.seek(tp, beginOffset);
}
});
Thread.sleep(100);
boolean flag = true;
System.out.println("search started......from offset is "+beginOffset);
while(flag) {
ConsumerRecords<String, Object> crs = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, Object> record : crs) {
// search criteria
if(record.value().toString().contains("01111") && record.value().toString().contains("2021-11-06")) {
System.out.println("founddddddddddddddddddddddddddddddddddddddd "+record.offset());
filteredMessages.add(record.value().toString());
}
if (record.offset() == endOffset) {
flag = false;
break;
}
}
}
System.out.println("doneeeeeeeeeeeeeeeee");
}catch(Exception e) {
e.printStackTrace();
}finally {
consumer.close();
}
【问题讨论】:
-
只要把这个
MessageListener当作一个bean。将其注入您的控制器并从您的 MVC 端调用该方法!否则,不清楚您是否仍想访问 Kafka。请详细说明您的真正目标是什么。@KafkaListener确实打算在从主题中提取记录时由 Kafka 侦听器容器调用。手动调用这个方法是什么原因,目前还不清楚…… -
@ArtemBilan - 我的目标是“我有一个包含 20 个分区的主题并且有员工数据。从 UI 搜索屏幕中,我将传递多个员工编号,现在我想搜索所有这些分区以找到一个没有特定的员工数据。如果匹配,那么我想放入一个单独的列表并作为文件下载。我提前知道每个分区的 beginOffset 和 endOffest。并寻找我可以执行此操作的效率/速度。跨度>
标签: spring-kafka