【发布时间】:2019-11-21 14:52:21
【问题描述】:
我们正在开发一个应用程序,该应用程序从不同来源获取数据,一旦数据可用,我们就会对其进行处理,将其放在一起,然后继续将其转移到不同的主题。
在我们的例子中,我们有 3 个主题,每个主题都将带来与来自不同主题的数据有关系的数据,在这种情况下,生成的每个实体可以同时接收或不接收(或很短的时间),这就是问题出现的时候,因为在我们继续进入主题之前,需要将这 3 个实体合二为一。
我们的想法是创建一个单独的主题,该主题将包含所有尚未处理的数据,然后有一个单独的线程以固定的时间间隔检查该主题并检查该主题的依赖关系可用,如果它们可用,那么我们从这个单独的主题中删除这个实体,如果没有,我们将这个实体保留在那里,直到它得到解决。
在所有这些解释的最后,我的问题是,以这种方式这样做是否合理,或者 Kafka 提供了其他好的实践或策略来解决这种情况?
【问题讨论】:
-
我建议使用 Procesor API 和状态存储。对于每个实体,您将其存储在状态存储中,并为每个新输入记录更新它,直到实体完成 - 对于这种情况,您将其从存储中删除并
context.forward()它。查看 Kafka Streams 文档以了解有关处理器 API 的更多详细信息。
标签: apache-kafka kafka-consumer-api apache-kafka-streams