【问题标题】:Session windows in Apache Beam with pythonApache Beam 中的会话窗口与 python
【发布时间】:2019-03-20 13:32:57
【问题描述】:

我有一个用户事件流。我已将它们映射到 KV{ userId, event },并分配了时间戳。

这是在流模式下运行。我希望能够创建以下输入输出结果:

会话窗口间隙=1

  • 输入:user=1, timestamp=1, event=a
  • 输入:user=2, timestamp=2, event=a
  • 输入:user=2, timestamp=3, event=a
  • 输入:user=1, timestamp=2, event=b
  • 时间:lwm=3
  • 输出:user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • 时间:lwm=4
  • 输出:user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]

这样我就可以编写我的函数来为用户减少会话窗口中的事件列表以及会话窗口的开始和结束时间。

我该怎么写? (如果您回答;“查看示例”,这不是一个有效的答案,因为它们从不将事件列表提供给以窗口为参数的减速器)

【问题讨论】:

    标签: python apache-beam


    【解决方案1】:

    如果我理解正确,这将是question 的后续行动,自然而然地通过添加我在解决方案中建议的 Group By Key 步骤来完成。

    所以,如果我们有这样的管道,请参考我之前的解释并在这里只关注更改:

    events = (p
      | 'Create Events' >> beam.Create(user1_data + user2_data) \
      | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
      | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['user_id'], x))
      | 'user_session_window'   >> beam.WindowInto(window.Sessions(session_gap),
                                                 timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
      | 'Group' >> beam.GroupByKey() \
      | 'analyze_session'         >> beam.ParDo(AnalyzeSession()))
    

    现在元素已按照您在问题描述中的描述进行排列,因此我们可以简单地将它们登录到AnalyzeSession

    class AnalyzeSession(beam.DoFn):
      """Prints per session information"""
      def process(self, element, window=beam.DoFn.WindowParam):
        logging.info(element)
        yield element
    

    获得想要的结果:

    INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
    INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
    INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
    INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])
    

    如果您想避免冗余信息,例如将user_idtimestamp 作为值的一部分,可以在Map 步骤中删除它们。 根据完整的用例(即减少每个会话级别的聚合事件),我们可以使用以下方式计算事件数量或会话持续时间:

    class AnalyzeSession(beam.DoFn):
      """Prints per session information"""
      def process(self, element, window=beam.DoFn.WindowParam):
        user = element[0]
        num_events = str(len(element[1]))
        window_end = window.end.to_utc_datetime()
        window_start = window.start.to_utc_datetime()
        session_duration = window_end - window_start
    
        logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)
    
        yield element
    

    对于我的示例,它将输出以下内容:

    INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
    INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
    INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
    INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session
    

    完整代码here

    【讨论】:

    • 谢谢@Guillem — 我已经接受了这个作为答案,但由于 Beam 周围经验丰富且缺乏社区,我已经搬到了 Flink。我/以前从未使用过的人的统计数据:16h Beam,未能执行上述操作,6h Flink,在第一次尝试中成功使用会话窗口,并且还可以刷新/清除窗口并根据数据控制窗口“前瞻”。
    猜你喜欢
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    • 2021-12-22
    • 2021-04-13
    • 1970-01-01
    • 2021-11-11
    • 2021-11-25
    • 1970-01-01
    相关资源
    最近更新 更多