【问题标题】:Apache Beam/Java, how to set window/trigger that sends the data only once for each windowApache Beam/Java,如何设置每个窗口只发送一次数据的窗口/触发器
【发布时间】:2020-04-05 01:37:59
【问题描述】:

我有如下管道:

Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
      .triggering(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime
            .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes();

PCollectionTuple productProcessorPT = pipeline
  .apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
    .fromSubscription(options.getProductSubscription()))
  .apply(PRODUCT_WINDOW.getName(), fixedWindow)
  .apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
  .apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
  .apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
    .withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));

我想要实现的是设置一个窗口/触发器,每 60 秒收集一次数据,然后将数据发送到下一个转换。我怎样才能做到这一点?我不在乎事件时间戳。

上面的代码每 60 秒发送一次数据到下一次转换,但即使没有新数据进入管道,它也会继续触发/发送(相同的)数据。不知道为什么会这样?

【问题讨论】:

  • 嗨@MichaelXiao,欢迎来到 StackOverflow 和你的第一个好问题。由于问题的标题有助于吸引正确的答案,因此可以将“我的案例”替换为更具描述性的内容,例如“仅发送一次相同的数据”

标签: java apache-beam


【解决方案1】:

您可以删除触发,只需使用 FixedWindows 如下所示每 60 秒发出一次记录

Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));

这将使用延迟事件的默认触发和处理,这基本上意味着数据在窗口结束时发出并且所有延迟事件都被忽略。

【讨论】:

  • 使用默认触发器是我尝试过的第一件事,但奇怪的是如果我只使用默认触发器,那么转换'ProductCombiner'根本不会有数据输出。我正在考虑使用如下窗口/触发器:Window&lt;String&gt; fixedWindow = Window.&lt;String&gt;into(new GlobalWindows()) .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration())))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes(); 这会有副作用吗?
  • 您能否详细说明一下 transform ProductCombiner 没有数据输出是什么意思?您是否建议使用默认触发器发布 GroupBy 不会发出任何记录?
  • 是的,如果我使用默认 trigger,则发布 GroupBy 不会发出任何记录,这意味着整个管道都不会输出任何记录。如果我在帖子中使用窗口/触发器,我可以获得 GroupBy 后的输出,但问题是ProductCombiner 的 PCollection.out 中的元素计数永远不会下降,从日志中我看到ProductCombiner 处理第一个元素成功,然后尝试一次又一次地处理相同的数据并获得异常,这让我觉得我帖子中的触发器在某些情况下如何继续触发。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-12-16
  • 1970-01-01
  • 2018-05-25
  • 1970-01-01
  • 1970-01-01
  • 2020-09-02
  • 2018-05-31
相关资源
最近更新 更多