【问题标题】:Spark: Transforming multiple dataframes in parallelSpark:并行转换多个数据帧
【发布时间】:2020-09-19 21:27:24
【问题描述】:

了解如何在并行转换多个数据帧时实现最佳并行度

我有一个路径数组

val paths = Array("path1", "path2", .....

我正在从每个路径加载数据帧,然后转换并写入目标路径

paths.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})

转换processData 独立于我正在加载的数据框。

这限制了一次处理一个数据帧,我的大部分集群资源都是空闲的。由于处理每个数据帧是独立的,我将 Array 转换为 scala 的 ParArray

paths.par.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})

现在它在集群中使用了更多资源。我仍在尝试了解它的工作原理以及如何在这里微调并行处理

    1234563 ?
  1. 它如何影响像 EventLoggingListnener 这样的集中式 spark 事物,因为并行处理多个数据帧,它需要处理更多的事件流入。

  2. 为了优化资源利用率,我考虑了哪些参数。

  3. 任何其他方法

我可以通过任何资源来了解这种缩放将非常有帮助

【问题讨论】:

  • 你们不同的数据框是否共享相同的架构?
  • @Oli 他们不共享相同的架构。但是用于转换的列是通用的。
  • 你有group by 之类的转换还是只选择和withColumn?
  • 是的,有广泛的转变。 rank over partition by
  • 我有一个想法,它可能有效或无效,取决于您的转换。让我知道 :) 您可能需要在聚合、窗口等中添加 path 列...

标签: apache-spark


【解决方案1】:

这很慢的原因是 Spark 非常擅长对存储在一个大数据帧中的大量数据进行并行计算。但是,它在处理大量数据帧方面非常糟糕。它将使用它的所有执行器(即使它们不是全部需要)开始计算,并等待它完成,然后再开始下一个。这会导致许多不活动的处理器。这很糟糕,但这不是 spark 的设计目的。

我有一个窍门给你。可能需要对其进行一些改进,但你会有这个想法。这就是我要做的。从路径列表中,我将提取 parquet 文件的所有模式并创建一个新的大模式来收集所有列。然后,我会要求 spark 使用此模式读取所有 parquet 文件(不存在的列将自动设置为 null)。然后我将合并所有数据帧并对这个大数据帧执行转换,最后使用partitionBy 将数据帧存储在单独的文件中,同时仍然并行执行所有这些操作。它看起来像这样。

// let create two sample datasets with one column in common (id)
// and two different columns x != y
val d1 = spark.range(3).withColumn("x", 'id * 10)
d1.show
+---+----+
| id|  x |
+---+----+
|  0|   0|
|  1|  10|
|  2|  20|
+---+----+

val d2 = spark.range(2).withColumn("y", 'id cast "string")
d2.show
+---+---+
| id|  y|
+---+---+
|  0|  0|
|  1|  1|
+---+---+

// And I store them
d1.write.parquet("hdfs:///tmp/d1.parquet")
d2.write.parquet("hdfs:///tmp/d2.parquet")
// Now let's create the big schema
val paths = Seq("hdfs:///tmp/d1.parquet", "hdfs:///tmp/d2.parquet")
val fields = paths
    .flatMap(path => spark.read.parquet(path).schema.fields)
    .toSet //removing duplicates
    .toArray
val big_schema = StructType(fields)

// and let's use it
val dfs = paths.map{ path => 
    spark.read
        .schema(big_schema)
        .parquet(path)
        .withColumn("path", lit(path.split("/").last))
}

// The we are ready to create one big dataframe
dfs.reduce( _ unionAll _).show
+---+----+----+----------+
| id|   x|   y|      file|
+---+----+----+----------+
|  1|   1|null|d1.parquet|
|  2|   2|null|d1.parquet|
|  0|   0|null|d1.parquet|
|  0|null|   0|d2.parquet|
|  1|null|   1|d2.parquet|
+---+----+----+----------+

然而,我不建议在大量数据帧上使用 unionAll。由于 spark 对执行计划的分析,对于许多数据帧,它可能会非常慢。我会使用 RDD 版本,虽然它更冗长。

val rdds = sc.union(dfs.map(_.rdd))
// let's not forget to add the path to the schema
val big_df = spark.createDataFrame(rdds, 
    big_schema.add(StructField("path", StringType, true)))
transform(big_df)
    .write
    .partitionBy("path")
    .parquet("hdfs:///tmp/processed.parquet")

看看我处理过的目录,我明白了:

hdfs:///tmp/processed.parquet/_SUCCESS
hdfs:///tmp/processed.parquet/path=d1.parquet
hdfs:///tmp/processed.parquet/path=d2.parquet

【讨论】:

  • 感谢您的方法。我喜欢尝试构建一个主数据框并对其进行转换而不是处理许多单独的数据框的想法。我会尝试为我的问题扩展它。
  • 很高兴我能帮上忙。如果您遇到问题,请不要犹豫,提出一个新问题,其中包含有关您的转换细节的更多详细信息。如果您在此处发布问题的链接,我会尽力帮助您。
【解决方案2】:

你应该在这里玩一些变量。最重要的是:CPU 内核、每个 DF 的大小和一点期货的使用。提议是决定要处理的每个 DF 的优先级。您可以使用 FAIR 配置,但这还不够,并行处理可能会占用集群的很大一部分。您必须为 DF 分配优先级并使用 Future pooll 来控制在您的应用中运行的并行作业的数量。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-22
    • 2016-10-03
    • 1970-01-01
    • 2016-09-27
    • 2019-01-16
    相关资源
    最近更新 更多