【问题标题】:Retry logic in kafka consumerkafka消费者中的重试逻辑
【发布时间】:2016-09-01 11:28:48
【问题描述】:

我有一个用例,我使用队列中的某些日志并使用该日志中的一些信息访问一些第三方 API,以防第三方系统没有正确响应,我希望为该特定日志实现重试逻辑.

我可以添加一个时间字段并将消息重新推送到同一个队列,如果它的时间字段有效,即小于当前时间,则该消息将再次被消费,否则再次被推送到队列中。

但是这个逻辑会一次又一次地添加相同的日志,直到重试时间正确并且队列会不必要地增长。

有没有更好的方法在 Kafka 中实现重试逻辑?

【问题讨论】:

  • 如果第三方 API 在您想发送消息 N 时没有响应,那么继续发送消息 N+1、N+2 等并稍后返回消息 N 是否有意义?如果没有,似乎没有必要让 Kafka 帮助重试。只需让您的消费者退后几秒钟、几分钟、几小时,然后再次推送消息 N。
  • 是的,首先我想到了一些类似的方法,在失败的情况下我将寻求消费者到可渗透的偏移量。但是有没有办法限制消费者在 N 单位时间内消费消息?

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

是的,这可能是我也想到的一种直接解决方案。但是这样一来,我们最终会创建许多主题,因为消息处理可能会再次失败。

我通过将此用例映射到 Rabbit MQ 解决了这个问题。在 rabbit MQ 中,我们有重试交换的概念,如果消息处理从交换失败,那么您可以将其发送到带有 TTL 的重试交换。一旦 TTL 过期,消息将移回主交换并准备再次处理。

我可以发布一些示例来解释我们如何使用 Rabbit MQ 实现指数退避消息处理。

【讨论】:

  • 请贴一些例子:)
【解决方案2】:

您可以创建多个重试主题并在那里推送失败的任务。例如,您可以在分钟内创建 3 个具有不同延迟的主题,并轮换单个失败的任务,直到达到最大尝试限制。

‘retry_5m_topic’ — 5分钟后重试

‘retry_30m_topic’ — 30分钟后重试

‘retry_1h_topic’ — 1小时后重试

查看更多详情:https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

【讨论】:

    【解决方案3】:

    在消费者中,如果它抛出异常,则生成另一条尝试号为 1 的消息。因此,下次消费时,它具有尝试号 1 的属性。如果它尝试的次数超过您的次数,则在生产者中处理重试计数,然后停止生产。

    【讨论】:

    • 但是你知道延迟重试的方法吗?例如。再次生产但仅在 5 分钟后消耗?因为如果消耗率很高,那么 10 次重试可以在几毫秒内耗尽。
    • 我看到了这篇文章。希望对你有帮助stackoverflow.com/questions/46665941/…
    • 我看到了这篇文章。希望对 stackoverflow.com/questions/46665941/… 有所帮助。您还可以发送带有当前时间戳的属性。如果超过 5 分钟,则将其与当前时间进行比较。然后处理它。否则产生相同的事件。这会进行不必要的检查。 Thread.sleep 是可以接受的。但我不确定线程​​机制。它可能会产生许多处于睡眠状态的线程。使用前先研究一下
    猜你喜欢
    • 2019-05-09
    • 1970-01-01
    • 2017-11-09
    • 1970-01-01
    • 2021-05-20
    • 1970-01-01
    • 2020-09-19
    • 2017-01-04
    • 2021-08-26
    相关资源
    最近更新 更多