【发布时间】:2019-01-10 04:47:31
【问题描述】:
在我的 Spring Boot Kafka 应用程序中,我有以下消费者配置:
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
和消费者:
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
如果我理解正确的话,现在我只有一个消费者实例。我想增加帖子消费者的数量,假设有 5 个消费者将消费来自${kafka.topic.post.send} 的不同(不相同)消息,以加快消息消费。
是不是就这么简单,把factory.setConcurrency(5);加到我的postKafkaListenerContainerFactory()上,比如:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
factory.setConcurrency(5);
return factory;
}
还是我需要做一些额外的工作才能实现它?
【问题讨论】:
-
您需要多个
group.id,因此消息会在同一主题的同一组中的各个消费者之间分发。你可以有 1 个微服务来完成这个过程并使用不同的 group.id 运行多个实例 -
或者,您可以在同一个应用程序中定义多个具有不同 group.id 的侦听器,即:第一个
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")第二个@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") -
@Paizo 谢谢,所以我需要将我的
sendPost方法克隆为 number = 具有不同 ID 的所需听众编号,就这样? -
另外,为什么我们有
factory.setConcurrency(5);方法?这种方法的目的是什么? -
我的错我不知道;除了我提到的两种方法之外,并发性也应该可以解决问题,请参阅 docs.spring.io/spring-kafka/reference/htmlsingle
4.1.3 Receiving Messages:If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions
标签: java spring-boot apache-kafka spring-kafka