【发布时间】:2018-09-11 17:25:38
【问题描述】:
我们有一个可以在多台机器上工作的流拓扑。我们将时间窗聚合结果存储到状态存储中。 由于状态存储存储的是本地数据,我认为应该在另一个主题上进行聚合以进行整体聚合。 但似乎我遗漏了一些东西,因为这些示例都没有在另一个 KStream 或处理器上进行整体聚合。
我们是否需要使用 groupBy 逻辑来存储整体聚合,或者使用 GlobalKtable 或者只是在某个地方实现我们自己的合并代码?
什么是正确的架构?
在下面的代码中,我尝试使用常量键将所有进入处理器的消息分组,以便将整体聚合存储在一台机器上,但我认为它会失去 Kafka 提供的并行性。
dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
.filter((key, event) -> event != null && event.getClientCreationDate() != null);
dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
.groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
.windowedBy(timeWindow)
.count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));
【问题讨论】:
-
只要您需要对键进行分组,您就可以有效地将所有数据集中到一个处理器中,而不管所使用的框架如何(例如,如果您在 Spark 中做了同样的事情) .目前还不清楚为什么需要对所有内容进行分组并“重新键入”它
-
所以你说,我们不必手动合并来自同一拓扑的不同实例的结果?
-
不清楚您将获得或期望作为输出的数据。但是,如果您想要并行化某些东西,理想情况下它不会是单词只是“count”的“wordcount”示例。例如,您可能想按
event.getName()或event.getClientUuid()或类似方式进行计数,但就像我说的,我不知道您的数据
标签: apache-kafka apache-kafka-streams stream-processing