【问题标题】:Odd behaviour of SlidingWindows when used with TestPipeline与 TestPipeline 一起使用时 SlidingWindows 的奇怪行为
【发布时间】:2016-11-10 12:30:09
【问题描述】:

我有一个简单的测试,演示了与 TestPipeline 一起使用时滑动窗口的奇怪行为。基本上,一堆字符串被输入,然后它们在滑动窗口中累积,然后应用总和聚合来计算重复项,最后记录聚合函数的输出。对于 10 分钟持续时间和 5 分钟周期的滑动窗口,我预计只有一个窗口用于存储所有元素(因为新窗口在第一个窗口后 5 分钟内启动)...

public class SlidingWindowTest {
    private static PipelineOptions options = PipelineOptionsFactory.create();
    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTest.class);

    private static class IdentityDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>>
        implements DoFn.RequiresWindowAccess{
        @Override
        public void processElement(ProcessContext processContext) throws Exception {
            KV<String, Integer> item = processContext.element();
            LOG.info("~~~~~~~~~~> {} => {}", item.getKey(), item.getValue());
            LOG.info("~~~~~~~~~~~ {}", processContext.window());
            processContext.output(item);
        }
    }

    @Test
    public void whatsWrongWithSlidingWindow() {
        Pipeline p = TestPipeline.create(options);

        p.apply(Create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3"))
            .apply(MapElements.via((String item) -> KV.of(item, 1))
                       .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
            .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.standardMinutes(10))
                                                        .every(Duration.standardMinutes(5))))
            .apply(Sum.integersPerKey())
            .apply(ParDo.of(new IdentityDoFn()));

        p.run();
    }
}

但我得到了 8 个窗口被触发。 TestPipeline 或者我对滑动窗口应该如何工作的理解有问题吗?

12:19:04.566 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for com.google.cloud.dataflow.sdk.values.KV<java.lang.String, java.lang.Integer>: KvCoder(StringUtf8Coder, VarIntCoder)
12:19:04.566 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)

P/S:Dataflow sdk 版本:1.8.0

【问题讨论】:

    标签: google-cloud-dataflow sliding-window


    【解决方案1】:

    预期的行为与您观察到的不同,但也与您的预期不同:

    • 首先,您有三个不同的键,因此如果它们都落入一个窗口中,那么您会期望三个输出。
    • 对于 10 分钟的滑动窗口和 5 分钟的周期,每个元素必然落入两个窗口。如果一个元素在1 分钟到达,它会落入从010 的窗口以及从-55 的窗口。因此,您应该期望 六个 输出值,每个键两个。将窗口视为随着管道运行而更新的东西是一个常见的陷阱,而实际上它们只是输入 data 的计算属性,而不是其到达时间或管道执行的属性。
    • Create 转换将输出时间戳为 BoundedWindow.TIMESTAMP_MIN_VALUE 的所有值,因此它们都应该落入相同的两个窗口中。

    您的示例似乎表明了一个真正的错误。 "a1b2c3" 应该不可能在它落入的两个不相交的窗口中,"abc" 也不应该落入三个窗口,其中两个是不相交的。

    不过,顺便说一句,您会受益于查看 DataflowAssert(现在在 Beam 中称为 PAssert)以一致且跨运行器的方式测试 PCollection 的内容。

    【讨论】:

    • 感谢 Kenn,这是一个很好的答案!我已经在使用 DataflowAssert,事实上这就是我发现滑动窗口这种奇怪行为的方式。您认为我最好将错误报告写到哪里:数据流或梁?
    • 由于您使用的是 Dataflow SDK 1.8.0,我认为您应该向github.com/GoogleCloudPlatform/DataflowJavaSDK 报告它,以强调它在那里有影响。然后,我们很可能会将其复制到 Beam JIRA,在 Beam 中修复它,然后进行反向移植。但我们会根据具体情况采取这些措施。
    猜你喜欢
    • 2023-03-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-11
    • 1970-01-01
    • 1970-01-01
    • 2016-07-08
    • 1970-01-01
    相关资源
    最近更新 更多