【问题标题】:Saving intermediate result in Spark在 Spark 中保存中间结果
【发布时间】:2016-06-23 07:05:41
【问题描述】:

我正在使用 Spark SQL 1.6.0 创建处理管道。该管道由步骤/转换组成,一个步骤的输出转发到下一个步骤。在最后一步之后,生成的 DataFrame 保存在 HDFS 中。我还需要在一些中间步骤保存结果。这样做的代码是:

saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame

这里,previousDataFrame 是最后一步的结果,saveDataFrame 只是将 DataFrame 保存为给定位置,然后 previousDataFrame 将用于下一步/转换。最后一步之后,它将保存在 HDFS 中。 saveDataFrame 的代码是:

implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
  source
    .write
    .mode(saveMode)
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
  source
    .write
    .mode(SaveMode.Overwrite)
    .format(format)
    .save(path)
}
else {
  throw new Exception("%s input format is not supported".format(format))
}}

这很好用,只有 spark 应用程序需要比平时更长的时间。如果保存中间输出应用程序在 20 分钟内运行,那么使用此代码需要 1 小时。虽然根据 Spark UI,作业和任务在 20 分钟内完成,但 spark-submit 过程会持续运行到 1 小时。

请帮助找出结果。我还尝试了以下两种可能的解决方案:

  • 使用Future创建多线程调用saveDataFrame。
  • 在保存之前缓存 previousDataFrame 并将其重用于下一步。

【问题讨论】:

  • 是的,出于某种原因,在 Spark 中保存数据帧非常缓慢。您可能想看看我给here 的答案——尽管投诉是关于写入S3,但我猜想OutputCommiterSUCCES 文件的建议仍然适用于HDFS。如果您在 AWS 上运行,您可以将需要持久化的 rdds 放到 Kineses 上,并使用 Firehose 存储它。这样一来,您的 Spark 应用程序就不会因写入而变慢。
  • 如果您不在 AWS 上运行,也许您仍然可以访问某种队列?或者你可以实现它——这个想法是在你的核心应用程序之外处理写入。
  • 谢谢,在您的回复的帮助下,我找到了问题,这是因为 AWS S3。许多输出之一是存储在 S3 上,这会减慢整个执行速度。将该输出移至 HDFS 后,执行在 10 分钟内完成。

标签: hadoop apache-spark spark-dataframe


【解决方案1】:

问题是导致执行延迟的 AWS S3 路径。当我开始将输出保存到 HDFS 时,执行时间减少了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-17
    • 1970-01-01
    • 1970-01-01
    • 2017-11-19
    相关资源
    最近更新 更多