【发布时间】:2020-10-19 23:09:24
【问题描述】:
如果在消息消费期间抛出错误,我要做的是暂停KafkaConsumer。
这是我写的
@KafkaListener(...)
public void consume(
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String,String> consumer,
@Payload String message) {
try {
//consumer message
} catch(Exception e) {
saveConsumer(consumer);
consumer.pause();
}
}
然后我写了一个REST服务来恢复消费者
@RestController
@RequestMapping("/consumer")
class ConsumerRestController {
@PostMapping("/resume")
public void resume() {
KafkaConsumer<String,String> consumer = getConsumer();
if(consumer != null) {
consumer.resume(consumer.paused());
}
}
}
现在,我有两个问题。
第一个问题:当我从 @KafkaListener 注释方法调用 consumer.pause() 时会发生什么?
消费者立即暂停,或者我可以接收与同一主题分区的其他偏移量相关的其他消息。
例如,我有偏移量为 3 的“message1”和偏移量为 4 的“message2”,“message1”导致异常,“message2”会发生什么?还是被消耗掉了吗?
第二个问题:从 REST 服务恢复消费者给出 ConcurrentModificationException,因为 KafkaConsumer 不是线程安全的。那么,我为什么要这样做呢?
【问题讨论】:
标签: apache-kafka kafka-consumer-api spring-kafka