【问题标题】:Pause and Resume KafkaConsumer暂停和恢复 KafkaConsumer
【发布时间】:2020-10-19 23:09:24
【问题描述】:

如果在消息消费期间抛出错误,我要做的是暂停KafkaConsumer

这是我写的

@KafkaListener(...)
public void consume(
 @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String,String> consumer,
 @Payload String message) {
  
    try {
        //consumer message
    } catch(Exception e) {
        saveConsumer(consumer);
        consumer.pause();
    }
}

然后我写了一个REST服务来恢复消费者

@RestController
@RequestMapping("/consumer")
class ConsumerRestController {
    @PostMapping("/resume")
    public void resume() {
        KafkaConsumer<String,String> consumer = getConsumer();
        if(consumer != null) {
            consumer.resume(consumer.paused());
        }
    }
}

现在,我有两个问题。 第一个问题:当我从 @KafkaListener 注释方法调用 consumer.pause() 时会发生什么? 消费者立即暂停,或者我可以接收与同一主题分区的其他偏移量相关的其他消息。 例如,我有偏移量为 3 的“message1”和偏移量为 4 的“message2”,“message1”导致异常,“message2”会发生什么?还是被消耗掉了吗?

第二个问题:从 REST 服务恢复消费者给出 ConcurrentModificationException,因为 KafkaConsumer 不是线程安全的。那么,我为什么要这样做呢?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    不要直接暂停消费者;而是暂停容器。

    @KafkaListener(id = "foo", ...)
    
    @Autowired KafkaListenerEndpointRegistry;
    
    ...
    
    registry.getListenerContainer("foo").pause();
    

    暂停会在下次轮询前生效;如果您想立即暂停(并且不处理上次轮询的剩余记录),请在暂停后抛出异常(假设您使用的是现在默认的 SeekToCurrentErrorHandler

    【讨论】:

    • 谢谢。改为恢复?我需要根据自定义逻辑恢复并寻找消费者。
    • 但是我不完全理解暂停容器和暂停消费者之间的区别。如果 KafkaListener 被配置为监听两个或多个主题或消费者监听不同的分区会发生什么?我只会暂停/恢复无法正确处理消息的消费者,而不是所有消费者。
    • 使用端点注册表恢复容器,方法相同;如果您需要在恢复侦听器之前执行搜索,请扩展 AbstractConsumerSeekAwaredocs.spring.io/spring-kafka/docs/2.6.2/reference/html/#seek
    • @KafkaListener@Repeatable;如果你想独立控制消费者,你应该使用多个注解。
    • 不幸的是,这对我没有帮助。我不能使用多个注释。我仍然不明白为什么KafkaConsumer 作为那个逻辑,我看到同一个线程可以调用它的方法(一部分wakeup)。 Doc 说 KafkaConsumer 不是线程安全的,但似乎它根本不支持多线程。
    猜你喜欢
    • 2013-07-19
    • 2015-07-28
    • 2014-12-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多