【问题标题】:How to manage Kafka KStream to Kstream windowed join?如何管理 Kafka KStream 到 Kstream 窗口连接?
【发布时间】:2017-06-02 03:34:23
【问题描述】:

基于apache Kafka docsKStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?

有没有什么好的例子来展示一个窗口化的 KStream-to-kStream 窗口化连接?

就我而言,假设我有 2 个 KStream,kstream1kstream2 我希望能够加入 10 天的 kstream1 到 30 天的 kstream2

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    这是绝对可能的。当您定义 Stream 运算符时,您明确指定连接窗口大小。

    KStream stream1 = ...;
    KStream stream2 = ...;
    long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
    long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
    
    stream1.leftJoin(stream2,
                     ... // add ValueJoiner
                     JoinWindows.of(joinWindowSizeMs)
    );
    
    // or if you want to use retention time
    
    stream1.leftJoin(stream2,
                     ... // add ValueJoiner
                     (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                             .until(windowRetentionTimeMs)
    );
    

    更多详情请见http://docs.confluent.io/current/streams/developer-guide.html#joining-streams

    滑动窗口基本上定义了一个额外的连接谓词。在类似 SQL 的语法中,这将类似于:

    SELECT * FROM stream1, stream2
    WHERE
       stream1.key = stream2.key
       AND
       stream1.ts - before <= stream2.ts
       AND
       stream2.ts <= stream1.ts + after
    

    在此示例中为before == after == joinWindowSizeMs。如果您使用 JoinWindows#before()JoinWindows#after() 显式设置这些值,beforeafter 也可以具有不同的值。

    源主题的保留时间完全独立于指定的windowRetentionTimeMs,该windowRetentionTimeMs 应用于Kafka Streams 自身创建的变更日志主题。窗口保留允许将无序记录彼此连接,即延迟到达的记录(请记住,Kafka 有一个基于 offset 的排序保证,但关于 timestamps ,记录可以乱序)。

    【讨论】:

    • 谢谢,我会检查并在可以运行时接受您的回答。我已经阅读了你提到的大部分例子,但我找不到任何 KStream Windowed join
    • 还有。如何指定不同的窗口大小,因为在我的情况下,我想加入 10 天的 stream-1 和 30 天的 stream-2
    • 对不起这些例子。似乎只有 KTable 连接...(认为也有 KStream-KStream-join)。反正。关于“加入 10 天的 stream-1 和 30 天的 stream-2”:这对于 Kafka Streams 是不可能的,因为 Kafka Streams 只支持 Sliding-Window-Join——你需要一个 Hopping-Window-Join。跨度>
    • 你能否解释一下你的答案,joinWindowSizeMs 和 windowRetentionTimeMs 有什么区别,因为我不能使用 JoinWindow.of(joinWindowSizeMs) .until(windowRetentionTimeMs) 它不接受直到我的输出只是可以使用 JoinWindow.of(joinWindowSizeMs)。除此之外,当您使用计时时,它仅将其应用于第二个流或两者? beucase 基于cwiki.apache.org/confluence/display/KAFKA/… in KStream-KStream Join 从我得到的窗口时间适用于两个流,对吗?
    • 刚刚更新了我的问题。如果你使用 until(),你需要添加一个演员来使它工作(如果即将发布的 0.10.2 版本,我会添加一个修复这个问题)。如果您不了解窗口保留时间,请暂时忽略它——它并不重要。或者请提出一个新问题以保持清洁。将多个问题混合为一个问题是不好的。是的,窗口时间大小适用于两个流。请参阅“类似 SQL”的语句以了解其工作原理。
    【解决方案2】:

    除了 Matthias J. Sax 所说的之外,还有一个流到流(窗口)连接示例,位于: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java

    这适用于带有 Apache Kafka 0.10.1 的 Confluent 3.1.x,即截至 2017 年 1 月的最新版本。有关使用较新版本的代码示例,请参阅上面存储库中的 master 分支。

    这是上面代码示例的关键部分(同样,对于 Kafka 0.10.1),稍微适应了您的问题。请注意,此示例恰好演示了 OUTER JOIN。

    long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
    long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
    
    final Serde<String> stringSerde = Serdes.String();
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
    KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
    
    KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
        (impressionValue, clickValue) -> impressionValue + "/" + clickValue,
        // KStream-KStream joins are always windowed joins, hence we must provide a join window.
        JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
        stringSerde, stringSerde, stringSerde);
    
    // Write the results to the output topic.
    impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");
    

    【讨论】:

      猜你喜欢
      • 2018-09-18
      • 2016-12-30
      • 2019-10-26
      • 2020-04-21
      • 2017-01-08
      • 1970-01-01
      • 2018-08-30
      • 1970-01-01
      • 2022-10-24
      相关资源
      最近更新 更多