【问题标题】:@OnTimer not triggering after window@OnTimer 在窗口后未触发
【发布时间】:2019-07-03 11:36:43
【问题描述】:

我一直在玩 Apache Beam 的计时器,但无法触发它们。

据我所知,您在 DoFn 中通过以下方式定义 Timer。

@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

我选择了TimeDomain.PROCESSING_TIME,因为我的事件没有分配时间戳,并且希望在窗口完成后立即触发 Timer 的执行。

        .apply(
             "FixedWindow",
            Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )
        .apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));

我希望有以下计时器,它位于 DoFn 内,它基本上在缓冲区内累积对象,在窗口完成后继续管道并处理事件集...

        @OnTimer("expiry")
    public void onExpiry(
        OnTimerContext context,
        @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
        @StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
        flush(context, bufferedState, bufferedSizeState);
    }

...要成功执行。我是否遗漏了什么或不了解计时器在 Apache Beam 中的工作原理?

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam


    【解决方案1】:

    您可以查看[1] 哪里有计时器使用示例。

    您需要设置计时器应该何时触发[2],可能是错过的地方。

    [1]https://beam.apache.org/blog/2017/08/28/timely-processing.html

    [2]https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java#L53

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-09-14
      • 1970-01-01
      • 1970-01-01
      • 2014-01-17
      • 2020-07-16
      • 1970-01-01
      • 2017-11-08
      • 1970-01-01
      相关资源
      最近更新 更多