【发布时间】:2021-09-23 19:48:05
【问题描述】:
我想暂停 Kafka Listener。
- 我调用 http://localhost:8080/pause 来暂停容器
- 我发送有关该主题的数据。
- 消费者开始消费消息
我预计消费者会被暂停,但仍在从 Kafka 主题中读取消息。 我怎样才能暂停消费者? 我正在使用 2.5.0 (Spring Boot Parent)
@KafkaListener(id="foo" ,topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@GetMapping("/pause")
public void pause( )
{
System.out.println(" Pausing Kafka Listener");
kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
【问题讨论】:
-
你用的是什么版本?我们刚刚修复了 2.7.x (today) 中的一个错误,在该错误中执行了暂停但随后立即恢复。对于早期版本(以及修复后),在处理完之前轮询的所有记录之前,暂停不会生效。如果您希望暂停立即生效,请设置
max.poll.records=1。 -
使用 KafkaListenerEndpointRegistry.getListenerContainer(id) 方法获取对容器的引用。将其转换为 ConcurrentMessageListenerContainer 并调用 getContainers() 以获取子 KafkaMessageListenerContainers 的列表;然后您可以单独暂停/恢复它们
-
@Gary Russell 这不是答案吗?
-
是的,所有 2.7.x 版本中都有一个错误。引导 2.5(当前为 2.5.2)引入 2.7.x;下周的 2.5.3 版本应该有修复(spring-kafka 2.7.4)。修复在快照 2.7.4-SNAPSHOT 中,您可以通过将 repo.spring.io/snapshots 添加到构建配置中来获得它。
2.7.4应该在星期一可用(并在本周晚些时候启动 2.5.3)。您也可以暂时降级到 2.6.9 (spring-kafka)。 @RanLupovich 不;暂停并发容器将暂停所有子容器。 -
快照仓库是repo.spring.io/snapshot(没有)。正确的;当您暂停/恢复容器时,所有主题/分区都会暂停/恢复。但是,2.7 添加了暂停单个
TopicPartitions 的功能(通过pausePartition()和resumePartition()(这就是打破容器暂停的原因)。
标签: apache-kafka kafka-consumer-api spring-kafka kafka-producer-api