【问题标题】:Kafka Stream Topology on multiple instances多个实例上的 Kafka 流拓扑
【发布时间】:2018-09-11 17:25:38
【问题描述】:

我们有一个可以在多台机器上工作的流拓扑。我们将时间窗聚合结果存储到状态存储中。 由于状态存储存储的是本地数据,我认为应该在另一个主题上进行聚合以进行整体聚合。 但似乎我遗漏了一些东西,因为这些示例都没有在另一个 KStream 或处理器上进行整体聚合。

我们是否需要使用 groupBy 逻辑来存储整体聚合,或者使用 GlobalKtable 或者只是在某个地方实现我们自己的合并代码?

什么是正确的架构?

在下面的代码中,我尝试使用常量键将所有进入处理器的消息分组,以便将整体聚合存储在一台机器上,但我认为它会失去 Kafka 提供的并行性。

dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
        .filter((key, event) -> event != null && event.getClientCreationDate() != null);

 dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
       .groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
       .windowedBy(timeWindow)
       .count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));

【问题讨论】:

  • 只要您需要对键进行分组,您就可以有效地将所有数据集中到一个处理器中,而不管所使用的框架如何(例如,如果您在 Spark 中做了同样的事情) .目前还不清楚为什么需要对所有内容进行分组并“重新键入”它
  • 所以你说,我们不必手动合并来自同一拓扑的不同实例的结果?
  • 不清楚您将获得或期望作为输出的数据。但是,如果您想要并行化某些东西,理想情况下它不会是单词只是“count”的“wordcount”示例。例如,您可能想按event.getName()event.getClientUuid() 或类似方式进行计数,但就像我说的,我不知道您的数据

标签: apache-kafka apache-kafka-streams stream-processing


【解决方案1】:

在下面的代码中,我尝试使用常量键将所有进入处理器的消息分组,以便将整体聚合存储在一台机器上,但我认为它会失去 Kafka 提供的并行性。

这似乎是正确的方法。是的,你失去了并行性,但这就是全局聚合的工作方式。最后,一台机器必须计算它......

不过,您可以改进的是采用两步方法:即,首先通过“随机”键并行聚合,然后使用只有一个键的第二步将部分聚合“合并”成一个。这样,计算的某些部分是并行的,只有最后一步(希望减少数据负载)是非并行的。使用 Kafka Streams,您需要“手动”实现此方法。

【讨论】:

  • 那么,我们应该将reducer处理器部署为一个单独的模块而不是扩展它吗?
  • 你会有两个减速器。您仍然可以将两者部署在一个应用程序中——如果您在第一个 reduce 之后设置了一个单例键,那么所有数据都将转到第二个 reducer 的单个实例。
猜你喜欢
  • 2021-09-28
  • 2019-10-31
  • 2018-01-19
  • 2015-10-05
  • 2021-04-24
  • 1970-01-01
  • 2015-08-10
  • 2017-06-09
  • 1970-01-01
相关资源
最近更新 更多