【问题标题】:Kafka Consumer: Stop processing messages when exception was raisedKafka Consumer:引发异常时停止处理消息
【发布时间】:2018-07-30 20:47:36
【问题描述】:

我对(Spring)Kafka 在停止ConcurrentMessageListenerContainer 之后/时的 poll() 行为有点困惑。

我想要实现的目标: 在引发异常后停止消费者(例如消息无法保存到数据库),不提交偏移量,在给定时间后重新启动它并从先前失败的消息重新开始处理。

我读过这篇文章,它说容器将使用轮询 (https://github.com/spring-projects/spring-kafka/issues/451) 中的剩余记录调用侦听器,这意味着无法保证在失败的消息之后,成功处理的另一条消息将提交抵消。这可能会导致消息丢失/跳过。

真的是这样吗?如果是的话,有没有解决方案可以在不升级新版本的情况下解决这个问题? (DLQ 不是我的解决方案)

我已经做过的事情: 设置setErrorHandler()setAckOnError(false)

private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps,  Class keyDeserializer) {
    Map<String, Object> props = new HashMap<>();
    //Set common props
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);

    if (kafkaProps.isSslEnabled()) {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
        props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
        props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
    }

    return props;
}

消费者

public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
    StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();

    ContainerProperties containerProperties = new ContainerProperties(...);
    containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    containerProperties.setAckOnError(false);
    containerProperties.setErrorHandler(stoppingErrorHandler);

    ConcurrentMessageListenerContainer<String, byte[]> container = ...
    container.setConcurrency(1); //use only one container
    stoppingErrorHandler.setConcurrentMessageListenerContainer(container);

    return container;
}

错误处理程序

public class StoppingErrorHandler implements ErrorHandler {

    @Setter
    private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;

    @Value("${backends.kafka.consumer.halt.timeout}")
    int consumerHaltTimeout;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        if (concurrentMessageListenerContainer != null) {
            concurrentMessageListenerContainer.stop();
        }

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
                    concurrentMessageListenerContainer.start();
                }
            }
        }, consumerHaltTimeout);
    }
}

我正在使用什么:

  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-kafka</artifactId>
  <version>2.1.2.RELEASE</version>

  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.1.7.RELEASE</version>

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    不升级新版本?

    2.1 引入了ContainerStoppingErrorHandler,即ContainerAwareErrorHandler,剩余的未消费消息被丢弃(并在容器重启时重新获取)。

    对于早期版本,您的侦听器将需要拒绝(失败)批处理中的剩余消息(或设置max.records.per.poll=1)。

    【讨论】:

    • 不幸的是,由于某些依赖关系,我们目前(计划中,但在此之前我们需要解决方法)无法升级到最新版本,这意味着我们无法使用 ContainerStoppingErrorHandler。关于性能,您是否有将max.records.per.poll 设置为 1 的经验?顺便说一句,您对“拒绝(失败)剩余消息”有什么想法吗?
    • 它肯定会更慢,但它仍然可能“足够快”以满足您的需求;如果没有,您必须在侦听器中添加检查以查询错误处理程序以查看我们是否处于失败状态,并拒绝批处理中的所有剩余记录。
    • 这是否意味着在 spring-kafka 1.3.* 中为 max.records.per.poll 使用更高的值时,将 ack 模式设置为 RECORD 与将其设置为 BATCH 具有相同的含义?例如,如果我在单次轮询中获取 5 个事件,第一个是好的,第二个失败(所以我叫 stop),并且 3,4,5 是好的 - 在两种 ack 模式下偏移量将移动到最新的,对吗?
    • 不要在 cmets 中就旧答案提出新问题。在 RECORD 和 BATCH 模式下,偏移量将为 2(如果 ackOnError 为真,则为 3)。
    猜你喜欢
    • 2019-12-06
    • 2018-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-29
    • 1970-01-01
    • 2018-06-11
    • 1970-01-01
    相关资源
    最近更新 更多