【问题标题】:Best practice at the moment of processing data with dependencies in Kafka?在Kafka中处理具有依赖关系的数据时的最佳实践?
【发布时间】:2019-11-21 14:52:21
【问题描述】:

我们正在开发一个应用程序,该应用程序从不同来源获取数据,一旦数据可用,我们就会对其进行处理,将其放在一起,然后继续将其转移到不同的主题。

在我们的例子中,我们有 3 个主题,每个主题都将带来与来自不同主题的数据有关系的数据,在这种情况下,生成的每个实体可以同时接收或不接收(或很短的时间),这就是问题出现的时候,因为在我们继续进入主题之前,需要将这 3 个实体合二为一。

我们的想法是创建一个单独的主题,该主题将包含所有尚未处理的数据,然后有一个单独的线程以固定的时间间隔检查该主题并检查该主题的依赖关系可用,如果它们可用,那么我们从这个单独的主题中删除这个实体,如果没有,我们将这个实体保留在那里,直到它得到解决。

在所有这些解释的最后,我的问题是,以这种方式这样做是否合理,或者 Kafka 提供了其他好的实践或策略来解决这种情况?

【问题讨论】:

  • 我建议使用 Procesor API 和状态存储。对于每个实体,您将其存储在状态存储中,并为每个新输入记录更新它,直到实体完成 - 对于这种情况,您将其从存储中删除并context.forward() 它。查看 Kafka Streams 文档以了解有关处理器 API 的更多详细信息。

标签: apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

根据保留政策,Kafka 消息可能会在一段时间后变得干净,因此您需要将消息存储在某处:

我可以看到以下选项,但总是每个问题都有可能的方法和解决方案:

  1. 已处理所有消息并将“未处理消息”转发到其他主题,例如 A
  2. Kafka 处理器 API 使用来自主题 A 的消息并存储到状态存储中
  3. 安排一个带有时间间隔的 punctuate() 方法
  4. 迭代存储在状态存储的所有消息。
  5. 检查依赖关系(如果可用)从状态存储中删除消息并对其进行处理或发布回原始主题以再次处理。

您可以参考以下链接以供参考 https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html

【讨论】:

  • 好吧,通过主题压缩,您可以间接删除一条消息...
猜你喜欢
  • 2018-08-27
  • 1970-01-01
  • 1970-01-01
  • 2019-08-24
  • 2021-11-03
  • 2015-07-29
  • 2022-08-09
  • 1970-01-01
相关资源
最近更新 更多