【问题标题】:Retry a message in KafkaStreams Topology在 KafkaStreams 拓扑中重试消息
【发布时间】:2020-06-24 13:12:51
【问题描述】:

我有一个 kafkaStreams 拓扑,其中有一个处理器 API。在处理器内部,有一个调用外部 API 的逻辑。

如果 API 返回 503,则需要重试消息。

现在,我正在尝试将此消息推送到不同的 kafka 主题并使用“Punctuate”方法每分钟从失败的主题中提取一批消息,重试。

有没有更好的方法/方法来解决这个问题?

【问题讨论】:

  • 1) 如果 1 分钟后仍然得到 503,会发生什么?在这种情况下,您的逻辑不会夸大失败的主题吗? 2) 需要异步重试吗?
  • @Peyman - 目标是使用指数退避重试算法重试。因此,对于每个重试计数,目标主题都会不同(retry-1、retry-2、retry-3、....)。也会异步重试,并且“prevRetrytime”小于预期的“nextRetryTime”,会被放回主题中。

标签: apache-kafka apache-kafka-streams retry-logic


【解决方案1】:

另一种稳健的方法是使用状态存储。它们由 Kafka 支持为压缩的变更日志主题。

您可以将失败的消息存储在状态存储中,并通过调用 schedule (punctuate) 将它们全部处理,然后删除所有成功处理的消息。

例如:

public class MyProcessor {

    private final long schedulerIntervalMs = 60000;
    private final String entityStoreName = "failed-message-store";
    private KeyValueStore<String, Object> entityStore;

    @Override
    public void init(ProcessorContext context) {
        this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
        context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
                timestamp -> processFailedMessagesStore());
    }

    @Override
    public void process(String key, Object value) {
        boolean apiCallSuccessful = // call API

        if (!apiCallSuccesfull) {
            entityStore.put(key, value);
        }
    }

    private void processFailedMessagesStore() {
        try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
            allItems.forEachRemaining(item -> {
                boolean successfullyProcessed = // re-process
                
                if (successfullyProcessed) {
                    entityStore.delete(item.key);
                }
            });
        }
    }
}

【讨论】:

  • 感谢您的回答。是的,我开始阅读 KafkaStreams 中可用的各种 StateStore(PersistentStore、GlobalStore、KVStore 等)。我担心的是,如果 pod 重新启动,StateStore 中包含的数据会被清除。好像不是这样的:)
  • 是的,数据被持久化到磁盘和压缩的 Kafka 主题中。如果 Pod 被清除并取而代之,新的 kafka 流应用程序会将压缩主题中的所有状态加载到磁盘上(这可能需要一段时间,具体取决于消息的数量)
猜你喜欢
  • 1970-01-01
  • 2011-05-08
  • 2017-06-09
  • 2016-02-06
  • 2017-10-11
  • 2020-04-19
  • 2018-08-08
  • 1970-01-01
  • 2018-07-19
相关资源
最近更新 更多