【发布时间】:2016-06-23 07:05:41
【问题描述】:
我正在使用 Spark SQL 1.6.0 创建处理管道。该管道由步骤/转换组成,一个步骤的输出转发到下一个步骤。在最后一步之后,生成的 DataFrame 保存在 HDFS 中。我还需要在一些中间步骤保存结果。这样做的代码是:
saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame
这里,previousDataFrame 是最后一步的结果,saveDataFrame 只是将 DataFrame 保存为给定位置,然后 previousDataFrame 将用于下一步/转换。最后一步之后,它将保存在 HDFS 中。 saveDataFrame 的代码是:
implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
source
.write
.mode(saveMode)
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
source
.write
.mode(SaveMode.Overwrite)
.format(format)
.save(path)
}
else {
throw new Exception("%s input format is not supported".format(format))
}}
这很好用,只有 spark 应用程序需要比平时更长的时间。如果保存中间输出应用程序在 20 分钟内运行,那么使用此代码需要 1 小时。虽然根据 Spark UI,作业和任务在 20 分钟内完成,但 spark-submit 过程会持续运行到 1 小时。
请帮助找出结果。我还尝试了以下两种可能的解决方案:
- 使用Future创建多线程调用saveDataFrame。
- 在保存之前缓存 previousDataFrame 并将其重用于下一步。
【问题讨论】:
-
是的,出于某种原因,在 Spark 中保存数据帧非常缓慢。您可能想看看我给here 的答案——尽管投诉是关于写入S3,但我猜想
OutputCommiter和SUCCES文件的建议仍然适用于HDFS。如果您在 AWS 上运行,您可以将需要持久化的 rdds 放到 Kineses 上,并使用 Firehose 存储它。这样一来,您的 Spark 应用程序就不会因写入而变慢。 -
如果您不在 AWS 上运行,也许您仍然可以访问某种队列?或者你可以实现它——这个想法是在你的核心应用程序之外处理写入。
-
谢谢,在您的回复的帮助下,我找到了问题,这是因为 AWS S3。许多输出之一是存储在 S3 上,这会减慢整个执行速度。将该输出移至 HDFS 后,执行在 10 分钟内完成。
标签: hadoop apache-spark spark-dataframe