一次项目想,多线程消费主题的中的数据,百度了一波之后,可以按分区进行单独消费,记录一下

首先,传统按照主题消费:

 1 @KafkaListener(topics = {Constants.KAFKA_TOPIC_PISHH}, containerFactory = "kafkaListenerMonitorFactory")
 2     public void listenDataInfo(List<ConsumerRecord> records, Acknowledgment ack) {
 3         try {
 4             long beginTime = System.currentTimeMillis();
 5             batchList.clear();
 6             for (ConsumerRecord record : records) {
 7                 count++;
 8                 Optional<?> kafkaMessage = Optional.ofNullable(record.value());
 9                 //获取kafka消息
10                logger.info(kafkaMessage.get().toString());
11             }
12         } catch (Exception e){
13             e.printStackTrace();
14         } finally {
15             ack.acknowledge();//手动提交偏移量
16         }
17     }

 

按照主题分区消费:

 1 @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = Constants.KAFKA_TOPIC_PISHH, partitions = { "0" }) }, containerFactory = "kafkaListenerMonitorFactory")
2 public void listenDataInfo0(List<ConsumerRecord> records, Acknowledgment ack) { 3 try { 4 long beginTime = System.currentTimeMillis(); 5 batchList.clear(); 6 for (ConsumerRecord record : records) { 7 count++; 8 Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 9 //获取kafka消息 10 logger.info(kafkaMessage.get().toString()); 11 } 12 13 } catch (Exception e){ 14 e.printStackTrace(); 15 } finally { 16 ack.acknowledge();//手动提交偏移量 17 } 18 }

 

参考:https://blog.csdn.net/russle/article/details/81258590

 

相关文章:

  • 2021-05-01
  • 2021-11-18
  • 2021-07-28
  • 2021-12-31
  • 2022-01-25
  • 2022-12-23
  • 2021-08-28
  • 2021-11-30
猜你喜欢
  • 2021-10-22
  • 2021-11-20
  • 2021-12-08
  • 2021-12-21
  • 2021-06-05
  • 2021-12-12
  • 2021-11-18
相关资源
相似解决方案