【发布时间】:2017-05-04 00:19:51
【问题描述】:
我正在对每 5 分钟推进的 4 小时窗口进行一次跳跃窗口聚合。由于跳跃窗口重叠,我得到了具有不同聚合值的重复键。
TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
如何消除重复数据的重复键或仅选择具有最新值的键。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
我正在对每 5 分钟推进的 4 小时窗口进行一次跳跃窗口聚合。由于跳跃窗口重叠,我得到了具有不同聚合值的重复键。
TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
如何消除重复数据的重复键或仅选择具有最新值的键。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
2021 年 5 月更新:如今,Kafka Streams API 通过 suppress() 运算符支持 "final" window results。有关详细信息,请参阅之前的文档链接以及 2019 年 3 月的博客 Kafka Streams’ Take on Watermarks and Triggers。
定义窗口计算后,您可以抑制中间结果,在窗口关闭时为每个用户发出最终计数。
KGroupedStream<UserId, Event> grouped = ...;
grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
原始答案(在不使用上述suppress() 运算符时仍然适用):
如果我理解正确,那么这是预期的行为。您没有看到“重复”键,但您会看到同一键的持续更新。
思考:
# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...
# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...
查看http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management 的解释,其中更详细地描述了这一点。如果您希望看到每个记录键的“重复”更少,您可以在应用程序的配置中通过cache.max.bytes.buffering aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 增加记录缓存的大小(使用 DSL 时)。还有与commit.interval.ms的相互作用。
如果您想知道“为什么 Kafka Streams API 首先会以这种方式运行”,我建议您阅读本周早些时候发布的博文 https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/。
【讨论】:
除了迈克尔所写的之外,在跳窗中还有另一层“重复”。由于窗口重叠,从后续窗口发出的值可能相同。
例如,假设您有一个 5 分钟的窗口和一分钟的跳跃:{0..5},{1..6},{2..7} 等等。来自输入主题的给定记录可能属于不同的时间窗口。
这与翻转窗口相反,其中窗口不重叠,因此每条记录都是单个窗口的一部分。不幸的是,翻滚窗口并不适合所有用例。一个示例可以是聚合,其中具有相同键的两条记录位于两个后续窗口的边缘。
使用跳跃窗口时,有多种“去重”方法。一种方法是在下游“重复数据删除”。另一种方法是在 Kafka Streams 中执行此操作,但这仅与特定拓扑相关。正如所解释的,这些结果不是真正的重复,而是连续窗口的结果。如果您只想要某个键的最后一个窗口的结果,您可以编写如下内容:
windowedKtable
.toStream((windowedKey, value) -> windowedKey.key())
.groupByKey()
.reduce((value1, value2) -> value1.lastestActivity() > value2.lastestActivity() ? value1 : value2)
我不会说这是最佳做法,只是在非常特殊的情况下解决问题的一种方法。
关于 Kafka Streams 中的窗口化的更多信息: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing
【讨论】:
使用跳窗时,一个键同时存在多个时间窗。当新的日志产生时,聚合会同时改变这些时间窗口的状态,因此在应用toStream()时,对下游主题产生了去重日志。
要获取最新时间窗口的结果,您必须提供filter() 来过滤最新时间窗口更改日志,这里举例说明如何在使用跳跃窗口时获取最新窗口聚合结果。
【讨论】:
suppress() 最终得到了最终结果。