【问题标题】:Performance tuning in sparkspark中的性能调优
【发布时间】:2020-04-25 16:28:18
【问题描述】:

我正在运行一个处理大约 2 TB 数据的 spark 作业。处理涉及:

  1. 读取数据(avrò 文件)
  2. 在属于地图类型的列上展开
  3. OrderBy 分解列中的键
  4. 过滤DataFrame(我有一个非常小的(7)组键(称为键集),我想过滤df)。我做一个df.filter(col("key").isin(keyset: _*) )
  5. 我将此 df 写入镶木地板(此数据框非常小)
  6. 然后我再次过滤原始数据帧以获取不在键集中的所有键 df.filter(!col("key").isin(keyset: _*) ) 并将其写入镶木地板。这是更大的数据集。

原始的 avro 数据约为 2TB。处理大约需要 1 小时。我想优化它。我在第 3 步之后缓存数据帧,使用 6000 的随机分区大小。min executors = 1000,max = 2000,executor memory = 20 G,executor core = 2。还有其他优化建议吗?左连接会比过滤器性能更好吗?

【问题讨论】:

  • Spark 集群中有多少个节点,使用了什么样的存储驱动器?在一小时内总共读取和写入 4 TB 数据(2 TB 读取 + 2 TB 写入)提供 1.1 GB/s,这取决于配置,可能接近或不接近物理限制。

标签: scala performance apache-spark


【解决方案1】:

在我看来一切都很好。 如果你有小数据集,那么isin 没问题。

1) 确保可以增加核心数。执行器核心=5

不建议每个执行器使用超过 5 个内核。这是基于一项研究,其中任何具有 5 个以上并发线程的应用程序都会开始影响性能。

2) 确保您有良好/统一的分区结构。

示例(仅用于调试目的,不用于生产):

  import org.apache.spark.sql.functions.spark_partition_id
  yourcacheddataframe.groupBy(spark_partition_id).count.show()

这将打印火花分区号和多少条记录 存在于每个分区中。基于此,如果您不想更多并行,您可以重新分区。

3) spark.dynamicAllocation.enabled 可能是另一种选择。

例如:

spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true

以及所有其他必需的道具.....那是该工作的。如果您在 spark-default.conf 中提供这些道具,它将适用于所有工作。

使用上述所有这些选项,您的处理时间可能会缩短。

【讨论】:

  • +1 动态分配是否在同一个 Spark 应用程序/作业中工作?这更多的是跨多个 Spark 应用程序,不是吗?
  • 如果您使用spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true 以及所有其他必需的道具......那就是那个工作。如果您在 spark-default.conf 中提供这些道具,则适用于集群中运行的所有应用程序。
  • 我的意思是问它是否可以在同一个应用程序中重复使用“免费资源”
  • 是的。根据它将分配和工作的未完成工作量。如果您有兴趣,请查看ExecutorAllocationManager.scala 以获得完整和更好的理解。
  • 如果你还好,请注意接受the answer as owner
【解决方案2】:

除上述内容外,根据您的要求和集群提出一些建议:

  1. 如果作业可以在 20g 执行器内存和 5 个核心上运行,则可以通过减少执行器内存并保持 5 个核心来容纳更多工作人员
  2. orderBy 真的是必需的吗? Spark 确保行在分区内排序,而不是在分区之间排序,这通常不是非常有用。
  3. 文件是否需要位于特定位置?如果没有,添加一个
df.withColumn("in_keyset", when( col('key').isin(keyset), lit(1)).otherwise(lit(0)). \
write.partitionBy("in_keyset").parquet(...)

可能会加快操作以防止数据被读取+爆炸2x。 partitionBy 确保键集中的项目与其他键位于不同的目录中。

【讨论】:

  • 文件应该在不同的配置单元表中。这就是我进行两次过滤的原因。我添加了缓存以防止读取两次
  • Spark 确保行在分区内排序是什么意思?它按什么排序?
  • 假设您的数据是[1,2,3,4,5,6,7,8],它被分成两个单独的文件。 part-0001 可以包含[1,3,5,7],而part-0002 可以包含[2,4,6,8]。这些值仅在分区内排序,我不相信 part-0001 保证包含所有最小值。 partitionBy 也可以做你想做的事 - 请参阅 stackoverflow.com/questions/31341498/…
【解决方案3】:

spark.dynamicAllocation.enabled 已启用

分区大小非常不均匀(基于输出 parquet 部分文件的大小),因为我正在执行 orderBy 键并且某些键比其他键更频繁。

keyset 是一个非常小的集合(7 个元素)

【讨论】:

  • 我的意思是退出不平衡。输出 parquet 部分文件的大小差异很大
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多