【问题标题】:How to get the max timestamp of the current sliding window如何获取当前滑动窗口的最大时间戳
【发布时间】:2016-11-01 19:03:51
【问题描述】:

我正在使用 X 大小和 Y 周期的滑动时间窗口。为了标记每个窗口的输出,我想获取PCollection当前窗口的时间戳。

    PCollection<T> windowedInput = input
      .apply(Window<T>into(
          SlidingWindows.of(Duration.standardMinutes(10))
                        .every(Duration.standardMinutes(1))));

   // Extract key from each input and run a function per group.
   //
   // Q: ExtractKey() depends on the window triggered time.
   //    How can I pass the timestamp of windowedInputs to ExtractKey()?
   PCollection<KV<K, Iterable<T>>> groupedInputs = windowedInputs
     .apply(ParDo.of(new ExtractKey()))
     .apply(GroupByKey.<K, Ts>create());

   // Run Story clustering and write outputs.
   //
   // Q: Also I'd like to add a window timestamp suffix to the output.
   //    How can I pass (or get) the timestamp to SomeDoFn()?
   PCollection<String> results = groupedInputs.apply(ParDo.of(new SomeDoFn()));

【问题讨论】:

    标签: google-cloud-dataflow


    【解决方案1】:

    DoFn 可以通过@ProcessElement 方法上的可选BoundedWindow 参数访问当前元素的窗口:

    class SomeDoFn extends DoFn<KV<K, Iterable<T>>, String> {
      @ProcessElement
      public void process(ProcessContext c, BoundedWindow window) {
        ...
      }
    }
    

    【讨论】:

    • 感谢您的建议。我会尝试。有没有类似的方法可以在代码 sn-p 中向 ExtractKey() 提供窗口信息?
    • 当然 - 只需将 BoundedWindow 参数添加到您的 ExtractKey 的 ProcessElement 方法。
    猜你喜欢
    • 2017-08-26
    • 2016-08-01
    • 1970-01-01
    • 2011-02-16
    • 2019-12-04
    • 2021-12-04
    • 1970-01-01
    • 2020-02-14
    • 2016-05-31
    相关资源
    最近更新 更多