【问题标题】:Not able to Pause Spring Kafka Container无法暂停 Spring Kafka 容器
【发布时间】:2021-09-23 19:48:05
【问题描述】:

我想暂停 Kafka Listener。

  1. 我调用 http://localhost:8080/pause 来暂停容器
  2. 我发送有关该主题的数据。
  3. 消费者开始消费消息

我预计消费者会被暂停,但仍在从 Kafka 主题中读取消息。 我怎样才能暂停消费者? 我正在使用 2.5.0 (Spring Boot Parent)

     @KafkaListener(id="foo" ,topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
        

@Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
        
          @GetMapping("/pause")
            public void pause(  )
            {
                
                System.out.println(" Pausing Kafka Listener");
                kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
            }
         
        
          @Configuration
    @EnableKafka
    public class KafkaConsumerConfig implements KafkaListenerConfigurer {
        
        
        @Autowired
        private LocalValidatorFactoryBean validator;
        
       
        @Override
        public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
              registrar.setValidator(this.validator);
            
        }
        

【问题讨论】:

  • 你用的是什么版本?我们刚刚修复了 2.7.x (today) 中的一个错误,在该错误中执行了暂停但随后立即恢复。对于早期版本(以及修复后),在处理完之前轮询的所有记录之前,暂停不会生效。如果您希望暂停立即生效,请设置max.poll.records=1
  • 使用 KafkaListenerEndpointRegistry.getListenerContainer(id) 方法获取对容器的引用。将其转换为 ConcurrentMessageListenerContainer 并调用 getContainers() 以获取子 KafkaMessageListenerContainers 的列表;然后您可以单独暂停/恢复它们
  • @Gary Russell 这不是答案吗?
  • 是的,所有 2.7.x 版本中都有一个错误。引导 2.5(当前为 2.5.2)引入 2.7.x;下周的 2.5.3 版本应该有修复(spring-kafka 2.7.4)。修复在快照 2.7.4-SNAPSHOT 中,您可以通过将 repo.spring.io/snapshots 添加到构建配置中来获得它。 2.7.4 应该在星期一可用(并在本周晚些时候启动 2.5.3)。您也可以暂时降级到 2.6.9 (spring-kafka)。 @RanLupovich 不;暂停并发容器将暂停所有子容器。
  • 快照仓库是repo.spring.io/snapshot(没有)。正确的;当您暂停/恢复容器时,所有主题/分区都会暂停/恢复。但是,2.7 添加了暂停单个 TopicPartitions 的功能(通过 pausePartition()resumePartition()(这就是打破容器暂停的原因)。

标签: apache-kafka kafka-consumer-api spring-kafka kafka-producer-api


【解决方案1】:

在所有 2.7.x 版本中都有一个错误。修复了 today 容器在暂停后立即恢复的问题。

引导 2.5(当前为 2.5.2)引入 2.7.x;下周的 2.5.3 版本应该有修复(spring-kafka 2.7.4)。

修复在快照 2.7.4-SNAPSHOT 中,您可以通过将 https://repo.spring.io/snapshot 添加到构建配置来获得。

2.7.4 应该在星期一可用(并在本周晚些时候启动 2.5.3)。

您也可以暂时降级到 2.6.9 (spring-kafka)。

【讨论】:

    猜你喜欢
    • 2020-11-11
    • 2018-06-23
    • 2021-03-12
    • 2019-04-27
    • 2021-12-24
    • 2020-02-08
    • 1970-01-01
    • 2018-07-17
    • 2019-02-11
    相关资源
    最近更新 更多