【问题标题】:Kafka Streams - Hopping windows - deduplicate keysKafka Streams - 跳跃窗口 - 去重键
【发布时间】:2017-05-04 00:19:51
【问题描述】:

我正在对每 5 分钟推进的 4 小时窗口进行一次跳跃窗口聚合。由于跳跃窗口重叠,我得到了具有不同聚合值的重复键。

TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)

如何消除重复数据的重复键或仅选择具有最新值的键。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    2021 年 5 月更新:如今,Kafka Streams API 通过 suppress() 运算符支持 "final" window results。有关详细信息,请参阅之前的文档链接以及 2019 年 3 月的博客 Kafka Streams’ Take on Watermarks and Triggers

    定义窗口计算后,您可以抑制中间结果,在窗口关闭时为每个用户发出最终计数。

    KGroupedStream<UserId, Event> grouped = ...;
    
    grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
           .count()
           .suppress(Suppressed.untilWindowCloses(unbounded()))
           .filter((windowedUserId, count) -> count < 3)
           .toStream()
           .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
    

    原始答案(在不使用上述suppress() 运算符时仍然适用):

    如果我理解正确,那么这是预期的行为。您没有看到“重复”键,但您会看到同一键的持续更新。

    思考:

    # Extreme case: record caches disabled (size set to 0)
    alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...
    
    # With record cache enabled, you would see sth like this.
    alice->23, alice->59, alice->100, ...
    

    查看http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management 的解释,其中更详细地描述了这一点。如果您希望看到每个记录键的“重复”更少,您可以在应用程序的配置中通过cache.max.bytes.buffering aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 增加记录缓存的大小(使用 DSL 时)。还有与commit.interval.ms的相互作用。

    如果您想知道“为什么 Kafka Streams API 首先会以这种方式运行”,我建议您阅读本周早些时候发布的博文 https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

    【讨论】:

    • 一切都好。我阅读了这篇文章并理解了这个概念。然而,出于报告目的,我计划将聚合结果推送到不支持更新的 ClickHouse 等数据库。这意味着我需要进行预选以仅过滤掉最终结果。这会扼杀性能......
    • @Macchitow 你是怎么做到的?我也有同样的问题。
    【解决方案2】:

    除了迈克尔所写的之外,在跳窗中还有另一层“重复”。由于窗口重叠,从后续窗口发出的值可能相同。 例如,假设您有一个 5 分钟的窗口和一分钟的跳跃:{0..5},{1..6},{2..7} 等等。来自输入主题的给定记录可能属于不同的时间窗口。

    这与翻转窗口相反,其中窗口不重叠,因此每条记录都是单个窗口的一部分。不幸的是,翻滚窗口并不适合所有用例。一个示例可以是聚合,其中具有相同键的两条记录位于两个后续窗口的边缘。

    使用跳跃窗口时,有多种“去重”方法。一种方法是在下游“重复数据删除”。另一种方法是在 Kafka Streams 中执行此操作,但这仅与特定拓扑相关。正如所解释的,这些结果不是真正的重复,而是连续窗口的结果。如果您只想要某个键的最后一个窗口的结果,您可以编写如下内容:

    windowedKtable
    .toStream((windowedKey, value) -> windowedKey.key())
    .groupByKey()
    .reduce((value1, value2) -> value1.lastestActivity() > value2.lastestActivity() ? value1 : value2)
    

    我不会说这是最佳做法,只是在非常特殊的情况下解决问题的一种方法。

    关于 Kafka Streams 中的窗口化的更多信息: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

    【讨论】:

      【解决方案3】:

      使用跳窗时,一个键同时存在多个时间窗。当新的日志产生时,聚合会同时改变这些时间窗口的状态,因此在应用toStream()时,对下游主题产生了去重日志。

      要获取最新时间窗口的结果,您必须提供filter() 来过滤最新时间窗口更改日志,这里举例说明如何在使用跳跃窗口时获取最新窗口聚合结果。

      hopping window aggregation with latest window result

      【讨论】:

      • 嘿,如果使用时间戳提取器呢?此解决方案不适用于自定义时间戳提取器。
      • Michael G. Noll 的回答比较合适,我用suppress() 最终得到了最终结果。
      猜你喜欢
      • 2021-08-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-10
      • 2020-09-05
      相关资源
      最近更新 更多