【问题标题】:Using Python's reduce() to join multiple PySpark DataFrames使用 Python 的 reduce() 加入多个 PySpark DataFrames
【发布时间】:2017-07-07 18:32:44
【问题描述】:

有谁知道为什么使用 Python3 的 functools.reduce() 会导致在加入多个 PySpark DataFrames 时比使用 for 循环迭代加入相同的 DataFrames 时性能更差?具体来说,这会导致速度大幅下降,然后出现内存不足错误:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

而这个没有:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

任何想法将不胜感激。谢谢!

【问题讨论】:

    标签: python python-3.x pyspark spark-dataframe pyspark-sql


    【解决方案1】:

    一个原因是 reduce 或 fold 通常在功能上是纯的:每个累加操作的结果不会写入内存的同一部分,而是写入新的内存块。

    原则上垃圾收集器可以在每次累加后释放前一个块,但如果没有,您将为累加器的每个更新版本分配内存。

    【讨论】:

      【解决方案2】:

      只要您使用 CPython(不同的实现可以但实际上不应该在这种特定情况下表现出明显不同的行为)。如果您查看reduce implementation,您会发现它只是一个 for 循环,具有最少的异常处理。

      核心完全等同于你使用的循环

      for element in it:
          value = function(value, element)
      

      并且没有证据支持任何特殊行为的说法。

      Spark 连接的帧数实际限制的额外简单测试 (joins are among the most expensive operations in Spark)

      dfs = [
          spark.range(10000).selectExpr(
              "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
          )
          for i in range(200)
      ]
      

      在直接for循环之间的时间上没有显着差异

      def f(dfs):
          df1 = dfs[0]
          for df2 in dfs[1:]:
              df1 = df1.join(df2, ["id"])
          return df1
      
      %timeit -n3 f(dfs)                 
      ## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
      

      reduce 调用

      from functools import reduce
      
      def g(dfs):
          return reduce(lambda x, y: x.join(y, ["id"]), dfs) 
      
      %timeit -n3 g(dfs)
      ### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
      

      类似的整体 JVM 行为模式在 for-loop 之间具有可比性

      For loop CPU and Memory Usage - VisualVM

      reduce

      reduce CPU and Memory Usage - VisualVM

      最后两者都生成相同的执行计划

      g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
          f(dfs)._jdf.queryExecution().optimizedPlan()
      )
      ## True
      

      这表明在评估计划和可能发生 OOM 时没有差异。

      换句话说,您的相关性并不意味着因果关系,观察到的性能问题不太可能与您用于组合DataFrames 的方法有关。

      【讨论】:

      • 可以像这样使用左连接或外连接:return reduce(lambda x, y: x.join(y, ["id"]), how="left", dfs) ?
      猜你喜欢
      • 2016-10-08
      • 1970-01-01
      • 2016-08-03
      • 2015-07-18
      • 2017-03-31
      • 2020-10-22
      • 1970-01-01
      • 2016-03-06
      相关资源
      最近更新 更多