【问题标题】:Optimization in pysparkpyspark 中的优化
【发布时间】:2016-11-29 15:55:35
【问题描述】:

我在 pyspark 中编写代码,在其中进行过滤、加入、unionall、groupby 操作。

但我注意到在 groupby 操作之前,count() 函数需要 9 分钟,而在 groupby 之后,count() 函数需要 35 分钟。我需要一些关于如何优化 groupby 子句以减少处理时间的建议。

配置参数: 执行器内存 12g 执行人数 50 执行器核心 5 驱动内存 40g 数据大小约 1 TB

以sn-p为例:

dataframeA = hc.sql("select * from tableA")



dataframeB = hc.sql("select * from tableB")

 dataframeC = hc.sql("select * from tableC")



dataframeD = dataframeA.unionAll(dataframeB)



dataframeE = dataframeD.join(dataframeC, col1 == col2, 'left_outer')



dataframeF = dataframeE.groupby ([col1, col2]
                   .agg({
                       max("col3"),
                       sum("col4")/ sum("col5"),
                       ...
                       })

有什么建议吗?

【问题讨论】:

  • 能够帮助您发布您的代码(或至少是它的简化案例)
  • 添加了代码sn-p供参考。

标签: pyspark aggregate


【解决方案1】:

您可以考虑使用reduceByKey 代替groupByKey

groupByKey 会在集群之间打乱所有数据,消耗大量资源,但reduceByKey 会先减少每个集群中的数据,然后打乱减少的数据。

【讨论】:

  • 是的,我怀疑 group by 也发生了过度洗牌。但是在我使用所有数据帧并且聚合中有大约 100 列的情况下,是否可以在此处应用 reducebykey?
  • 在你的情况下,DataFrame 的groupBy 已经过优化,所以我不认为reduceByKey 真的有效。
  • 有什么办法可以减少group by和聚合操作中的shuffle?
【解决方案2】:

逻辑本身似乎没问题。不过,您可以尝试以下几件事:

你有一个 join 和一个 groupby,通常会建议一些 shuffle。您可以尝试减少执行器的数量,为每个执行器提供更多的内存和内核。

在您的 groupby 中,您正在使用键 [col1, col2]。这些列与连接中的列相同吗?如果是这样,那么它们本质上是同一列,不同之处在于左侧数据帧上没有键。因此,您只能使用 col2 并希望优化器会改进您的第二次洗牌(使用原始洗牌)。

【讨论】:

  • 在我的例子中,有 3 个连接,其中一个与 group by 的键相同。在这种情况下,减少执行者的数量是行不通的,它需要相同的执行时间。
  • 我从 Web UI 注意到的是,23 个阶段中的最后 4 个阶段花费了一半以上的时间。增加分区呢?
  • 如果您有处理它们的核心,增加分区的数量通常会有所帮助。它基本上增加了并行度......
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-09-17
  • 1970-01-01
  • 2021-12-04
  • 2021-05-22
  • 2021-12-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多