【问题标题】:Kafka stream how to test a sliding window?Kafka流如何测试滑动窗口?
【发布时间】:2020-04-02 19:17:45
【问题描述】:

我有一个像这样测试滑动窗口的测试:

.groupByKey
.windowedBy {
        TimeWindows.of(Duration.ofMinutes(10))
          .advanceBy(Duration.ofMinutes(1)).grace(Duration.ofMillis(0))
      }
      .aggregate(...)
      .suppress(Suppressed.untilWindowCloses(unbounded()))

我需要在一个步骤后测试滑动窗口是否产生正确的输出。 我做了这样的事情:

for (i <- 0 to 8) testDriver.pipeInput(record..., T0 to T8)// produce one record every minute (9 records)

for (i <- 0 to 8)testDriver.pipeInput(record..., T8 + 5min) // these comes with a timestamp > 10 minutes tso there are on a second window

第一个循环应该在第一个时间窗口内产生结果,在滑动 1 分钟后,第二个循环应该产生第二个记录,其中第二个窗口的结果(滑动后) 我该如何检查?我不明白如何使用输出阅读器来检查这两个结果。

【问题讨论】:

    标签: scala apache-kafka streaming apache-kafka-streams


    【解决方案1】:

    只要您期望输出可用,您就可以调用testDriver.readOutput()。当然,您需要将结果写入输出主题,例如,

    ...
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()
    .to("output-topic");
    

    【讨论】:

    • 如果我在 10 分钟内每分钟发送一条记录,并且我有一个跳跃窗口和一个聚合产生一个结果,就像我在我的问题中写的那样(10 分钟,步长为 1 分钟)。我希望有 10 个输出,对吧?
    • 听起来对我来说是正确的(假设每条记录都有相同的键)。
    猜你喜欢
    • 1970-01-01
    • 2011-12-16
    • 1970-01-01
    • 2016-03-13
    • 2012-03-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-18
    相关资源
    最近更新 更多