【问题标题】:How can I retry failure messages from kafka?如何重试来自 kafka 的失败消息?
【发布时间】:2019-08-29 07:59:33
【问题描述】:

我的spring-boot application(consumer) 处理来自Apache Kafka 的消息。周期性地,按摩无法处理并且消费者抛出异常。无论如何,消费者提交抵消。 我可以区分Kafka中的成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题:

如何重试失败消息?我知道一些方法,但我不确定它们的正确性。

1) 将偏移量更改为早期。但是这样成功消息也会重试。

2) 当我捕捉到异常时,我将此消息发送到另一个主题(error-topic 例如)。但看起来很难。

3) 别的东西(你的变种)

【问题讨论】:

  • “消费者无论如何都提交了偏移量”是什么意思?您是否启用了spring.kafka.consumer.enable-auto-commit

标签: spring-boot apache-kafka spring-kafka


【解决方案1】:

如果你想要至少一次保证,一般模式如下:

  • 禁用自动提交(将enable.auto.commit设置为false)
  • 消费消息
  • 对于每条消息:

    • 如果没有错误,则提交偏移量
    • 如果出错,请重试多次
    • 如果成功,提交
    • 如果您想放弃,记录或发布到错误队列(用于分析或稍后重试)
  • 重复

【讨论】:

  • 如果我会retry as many times you wish我无法处理其他消息,我的话题会增加
  • 简单的重试不是很有用。立即重试也可能会失败(比如因为外部 API 不可用等)。对于重试,您还应该考虑使用重试主题。
【解决方案2】:

使用SeekToCurrentErrorHandler。它将重新定位偏移量以重播消息(默认为 10 次,但可配置)。

重试次数用尽后,它会调用一个“recoverer”来执行一些操作,例如DeadLetterPublishingRecoverer

【讨论】:

  • 谢谢@gary-russell 如果我想实现某种重新排队行为怎么办?例如,我的应用程序同时处理 20 条消息,然后我想重新排队 1,5 和 7 条消息。我应该将消息显式发布到同一主题,还是可以以某种方式确认这些特定消息?
  • DLPR 可以配置为发布回同一个队列。您不应在单个使用者上同时处理记录。 Kafka 只维护一个偏移量。使用分区实现并发。
【解决方案3】:

您可以在 YourConsumer 配置中进行以下更改:

  1. enable.auto.commit=False

  2. RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(retryInterval);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryMaxCount));
    
        return retryTemplate;
    }
    
  3. 在你的kafkaListenerContainerFactory:

    setretryTemplate(retryTemplate);
    factory.getContainerProperties().setAckOnError(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    

现在,在您的代码中,只要发生异常,只需在您的使用者中抛出异常。每当发生异常时,这不会更新偏移量。并且它将在retryInterval 时间后重试,最多maxRetryCount

如果您想忽略并且不重试某些类型的异常,请创建如下异常映射并将其传递到 SimpleRetryPolicy() 中,如下代码所示:

Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);

更多详情请访问此链接:Kafka Error Handling

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-10
    • 1970-01-01
    • 2018-02-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多