【发布时间】:2018-07-30 20:47:36
【问题描述】:
我对(Spring)Kafka 在停止ConcurrentMessageListenerContainer 之后/时的 poll() 行为有点困惑。
我想要实现的目标: 在引发异常后停止消费者(例如消息无法保存到数据库),不提交偏移量,在给定时间后重新启动它并从先前失败的消息重新开始处理。
我读过这篇文章,它说容器将使用轮询 (https://github.com/spring-projects/spring-kafka/issues/451) 中的剩余记录调用侦听器,这意味着无法保证在失败的消息之后,成功处理的另一条消息将提交抵消。这可能会导致消息丢失/跳过。
真的是这样吗?如果是的话,有没有解决方案可以在不升级新版本的情况下解决这个问题? (DLQ 不是我的解决方案)
我已经做过的事情:
设置setErrorHandler() 和setAckOnError(false)
private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps, Class keyDeserializer) {
Map<String, Object> props = new HashMap<>();
//Set common props
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
if (kafkaProps.isSslEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
}
return props;
}
消费者
public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();
ContainerProperties containerProperties = new ContainerProperties(...);
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
containerProperties.setAckOnError(false);
containerProperties.setErrorHandler(stoppingErrorHandler);
ConcurrentMessageListenerContainer<String, byte[]> container = ...
container.setConcurrency(1); //use only one container
stoppingErrorHandler.setConcurrentMessageListenerContainer(container);
return container;
}
错误处理程序
public class StoppingErrorHandler implements ErrorHandler {
@Setter
private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;
@Value("${backends.kafka.consumer.halt.timeout}")
int consumerHaltTimeout;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
if (concurrentMessageListenerContainer != null) {
concurrentMessageListenerContainer.stop();
}
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
concurrentMessageListenerContainer.start();
}
}
}, consumerHaltTimeout);
}
}
我正在使用什么:
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.2.RELEASE</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.7.RELEASE</version>
【问题讨论】:
标签: java apache-kafka kafka-consumer-api spring-kafka