【问题标题】:Why is FlatMap after GroupByKey in Apache Beam python so slow?为什么 Apache Beam python 中 GroupByKey 之后的 FlatMap 这么慢?
【发布时间】:2017-08-25 21:28:04
【问题描述】:

鉴于键/值对的数据源相对较小 (3,000-10,000),我试图仅处理满足组阈值 (50-100) 的记录。因此,最简单的方法是通过键对它们进行分组、过滤和展开 - 使用 FlatMap 或 ParDo。迄今为止,最大的组只有 1,500 条记录。但这似乎是 Google Cloud Dataflow 生产的严重瓶颈。

有给定的列表

(1, 1) (1, 2) (1, 3) ... (2, 1) (2, 2) (2, 3) ...

运行一组转换以按键过滤和分组:

p | 'Group' >> beam.GroupByKey()
  | 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
  | 'Unwind' >> beam.FlatMap(lambda (key, values): values)

关于如何提高性能的任何想法?感谢您的帮助!

【问题讨论】:

  • 请报告调查结果!如果答案有用,请选择它。

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

这是管道的一个有趣的角落案例。我相信您的问题在于您读取来自GroupByKey 的数据的方式。让我简要介绍一下 GBK 的工作原理。

什么是GroupByKey,以及大数据系统如何实现它

所有大数据系统都实现了对同一键的多个元素进行操作的方法。这在 MapReduce 中称为 reduce,在其他大数据系统中称为 Group By Key 或 Combine。

当您执行GroupByKey 转换时,Dataflow 需要将单个键的所有元素收集到同一台机器中。由于同一键的不同元素可能在不同的机器上处理,因此需要以某种方式对数据进行序列化。

这意味着当您读取来自 GroupByKey 的数据时,您正在访问工作人员的 IO(即不是从内存中),因此您确实希望避免多次读取 shuffle 数据。

这如何转化为您的管道

我相信您的问题是FilterUnwind 都将分别从随机播放中读取数据(因此您将读取每个列表的数据两次)。您要做的是仅读取一次随机播放数据。您可以使用单个FlatMap 来执行此操作,该FlatMap 可以过滤和展开您的数据,而无需从随机播放中重复读取。像这样的:

def unwind_and_filter((key, values)):
  # This consumes all the data from shuffle
  value_list = list(values)
  if len(value_list) > 50:
    yield value_list

p | 'Group' >> beam.GroupByKey()
  | 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)

如果这有帮助,请告诉我。

【讨论】:

  • 感谢您的回复!尽管尝试了不同的变化,但它没有效果。问题不在于它分组的速度,而是在 FlatMap 阶段从组中释放展开的元素的速度有多快——每秒两个或三个,而所有其他转换几乎是立即的。而且我知道它不能并行,因为它必须在同一台机器上。
  • 平面图之后你有什么改造?你有工作 ID 吗?
  • 当然这是一个 ParDo,它为每个验证点添加一个随机记录,然后是另一个对远程查询的随机记录进行批处理的 ParDo。职位编号为:2017-08-26_06_10_38-4021415491644394676。
  • 查看您的管道,有FetchWeather 步骤,它消耗的时间最多。您是否从在线服务中获取某些内容?此外,您似乎可能有一个 热键(即具有 PCollection 中大部分元素的单个键)-您认为这也是一种可能性吗?如果您有(热键 + 从外部服务获取),这意味着获取主要是串行完成的,这可能会使您的管道变慢
  • 太棒了!非常感谢您看一看。你是对的,在这个测试中有一个热键,但在生产中,这些键或多或少是均匀分布的。为什么从外部服务中提取是串行完成的?我希望系统能够启动并发的 FetchWeather 进程,以平均划分记录。
猜你喜欢
  • 2021-05-10
  • 2019-02-12
  • 2021-01-16
  • 2011-01-13
  • 1970-01-01
  • 1970-01-01
  • 2016-10-24
  • 2012-11-12
相关资源
最近更新 更多