【发布时间】:2020-04-05 01:37:59
【问题描述】:
我有如下管道:
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
PCollectionTuple productProcessorPT = pipeline
.apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
.fromSubscription(options.getProductSubscription()))
.apply(PRODUCT_WINDOW.getName(), fixedWindow)
.apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
.apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
.apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
.withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));
我想要实现的是设置一个窗口/触发器,每 60 秒收集一次数据,然后将数据发送到下一个转换。我怎样才能做到这一点?我不在乎事件时间戳。
上面的代码每 60 秒发送一次数据到下一次转换,但即使没有新数据进入管道,它也会继续触发/发送(相同的)数据。不知道为什么会这样?
【问题讨论】:
-
嗨@MichaelXiao,欢迎来到 StackOverflow 和你的第一个好问题。由于问题的标题有助于吸引正确的答案,因此可以将“我的案例”替换为更具描述性的内容,例如“仅发送一次相同的数据”
标签: java apache-beam