【问题标题】:Apache beam processing sequential elementsApache 光束处理顺序元素
【发布时间】:2026-01-07 01:25:05
【问题描述】:

我有来自不同设备的 GPS 坐标流。这些坐标可能会乱序到达,但它们都有一个事件时间。我想从 GPS 坐标计算总距离。

来自 Beam 编程指南:

组合函数应该是可交换的和关联的,因为该函数不一定在具有给定键的所有值上只调用一次

输入:

driver_id,x,y,event_time,processing_time
1,1,1,100001,20001
1,5,5,100004,20002
1,4,5,100003,20003

预期输出:

driver_id, distance, event_time
1,3,100001
1,1,100003

由于顺序很重要,我不能使用CombineFn 之类的东西。

所以我的问题是:

  1. 如何使用 Apache Beam 计算迟到事件的距离?
  2. 如何处理位于两个不同窗口中的连续坐标?
  3. 还有其他替代流处理框架可以解决我的问题吗?
  4. 活动可能会在活动时间后两天送达,但我想在确保它们全部送达之前获得一个大概的结果。

【问题讨论】:

  • 你能添加一个示例输入和想要的输出吗?
  • @Iñigo 编辑了我的问题。
  • 埃里克回答你的问题了吗?
  • @Pablo 还没有。我不知道如何处理两个窗口之间的连续事件。

标签: google-cloud-dataflow apache-beam


【解决方案1】:

我建议你花点时间学习如何处理late arrive data within in Windows

您可以创建一个固定窗口,然后允许延迟数据;但是,您还必须定义如何处理临时累积。

根据您的密钥结构,也许您可​​以键入驱动程序,然后提供更具体的启发式来确定完整性。

第 1 步 - 了解 Windows(各种类型)以及如何处理迟到的数据。注意:您需要使用特定的时钟时间标记您的传入元素,而不是使用默认接收时间。

【讨论】:

    【解决方案2】:

    您可以获得这样的事件列表:

    lists_of_events = (p
      | ReadFromPubSub()
      | Map(lambda x: (x['driver'], x))
      | WindowInto(Sessions(SESSION_GAP), 
                   allowed_lateness=ALLOWED_LATENESS,
                   accumulation_mode=ACCUMULATING)
      | GroupByKey())
    

    这将返回一个 PCollection,其中包含带有驱动程序 ID 的元组和每个窗口的事件列表(类型:Tuple[str, Iterable[Dict]])。

    然后您将编写一个函数来使用它:

    def calculate_distance(elm):
      driver, events = elm
      events = sorted(events, key=lambda x: x['timestamp'])
      # Now calculate distances
      distance, ev_time = ....
      yield {'driver': driver, 'distance': distance, 'event_time': ev_time}
    
    lists_of_events | Map(calculate_distance)
    

    【讨论】: