【发布时间】:2019-07-03 11:36:43
【问题描述】:
我一直在玩 Apache Beam 的计时器,但无法触发它们。
据我所知,您在 DoFn 中通过以下方式定义 Timer。
@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
我选择了TimeDomain.PROCESSING_TIME,因为我的事件没有分配时间戳,并且希望在窗口完成后立即触发 Timer 的执行。
.apply(
"FixedWindow",
Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()
)
.apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));
我希望有以下计时器,它位于 DoFn 内,它基本上在缓冲区内累积对象,在窗口完成后继续管道并处理事件集...
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
@StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
flush(context, bufferedState, bufferedSizeState);
}
...要成功执行。我是否遗漏了什么或不了解计时器在 Apache Beam 中的工作原理?
【问题讨论】:
标签: java google-cloud-dataflow apache-beam