【问题标题】:Apache Beam Session Window with limit on number of events具有事件数量限制的 Apache Beam 会话窗口
【发布时间】:2021-07-05 04:06:32
【问题描述】:

我们需要在用户 10 分钟内没有活动时生成用户会话,但需要进行以下调整

  • 我们还想在会话中的事件数达到 20k 时结束会话
  • 从窗口中的第一个事件开始 4 小时后结束会话

对于 10 分钟的会话窗口 - 我们已完成以下工作:

'User Session Window' >> beam.WindowInto(window.Sessions(10 * 60),                                                    
                        timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)

我需要知道如何添加其他两个条件 - 这样如果任何一个条件成功 - 我们想要发出行

【问题讨论】:

    标签: google-cloud-dataflow apache-beam stream-processing windowing


    【解决方案1】:

    为此,您可以基于现有的Sessions 编写自定义WindowFn,该BoundedWindow 使用跟踪元素计数的BoundedWindow 来跟踪这些额外的约束。请注意,这会增加管道的不确定性,因为发出的确切窗口集对元素出现的顺序很敏感。例如

    class CountingIntervalWindow(BoundedWindow):
      def __init__(self, start, end, count):
        ...
    
    class BoundedSessions(WindowFn):
      def assign(self, context):
        # type: (WindowFn.AssignContext) -> List[CountingIntervalWindow]
        timestamp = context.timestamp
        return [CountingIntervalWindow(timestamp, timestamp + self.gap_size, 1)]
    
      def get_window_coder(self):
        return PickleCoder()
    
      def merge(self, merge_context):
        # type: (WindowFn.MergeContext) -> None
        end = MIN_TIMESTAMP
        to_merge = []
        running_count = 0
        for w in sorted(merge_context.windows, key=lambda w: w.start):
          if to_merge:
            if (# Overlap
                end > w.start
                # Not too many elements.
                and running_count < self.count_limit
                # Not too large
                and w.end - to_merge[0].start < self.time_limit):
              to_merge.append(w)
              running_count += w.count
              if w.end > end:
                end = w.end
            else:
              if len(to_merge) > 1:
                merge_context.merge(
                    to_merge,
                    CountingIntervalWindow(to_merge[0].start, end, running_count))
              to_merge = [w]
              end = w.end
              running_count = w.count
          else:
            to_merge = [w]
            end = w.end
            running_count = w.count
        if len(to_merge) > 1:
          merge_context.merge(
              to_merge,
              CountingIntervalWindow(to_merge[0].start, end, running_count))
    
    

    【讨论】:

    • 非常感谢@robertwb 的回答。我试过这个,但没有得到预期的结果。大多数情况下,我认为因为“w.end - to_merge[0].start
    • 使用您提出的解决方案,它只会在会话窗口超时时触发 - 而不是在 2 个条件中的任何其他条件下触发
    • w.end 应该继续前进,直到 w.end - to_merge[0].start &lt; self.time_limit 不再为真,这将导致它落入 else 子句并开始一个新窗口。
    • 你跑的是什么跑步者?
    • 我在 directrunner 上测试这个。在生产中,我们在数据流 gcp 上运行它
    猜你喜欢
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    • 2021-12-22
    • 1970-01-01
    • 2013-07-25
    • 2021-09-29
    • 2021-04-13
    • 2021-11-11
    相关资源
    最近更新 更多