【问题标题】:How to save a spark DataFrame as csv on disk?如何在磁盘上将 spark DataFrame 保存为 csv?
【发布时间】:2016-01-15 10:34:27
【问题描述】:

例如这样的结果:

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

将返回一个数组。

如何将 spark DataFrame 保存为磁盘上的 csv 文件?

【问题讨论】:

  • 顺便说一句,这不会返回一个数组,而是一个 DataFrame! reference here
  • 如果给出的答案解决了您的问题,请接受并投票,以便我们将此问题归类为已解决!

标签: scala apache-spark apache-spark-sql


【解决方案1】:

Apache Spark 不支持磁盘上的本机 CSV 输出。

不过,您有四种可用的解决方案:

  1. 您可以将 Dataframe 转换为 RDD :

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    这将创建一个文件夹文件路径。在文件路径下,您会找到分区文件(例如 part-000*)

    如果我想将所有分区附加到一个大的 CSV 中,我通常会做的是

    cat filePath/part* > mycsvfile.csv
    

    有些人会使用 coalesce(1,false) 从 RDD 创建一个分区。这通常是一种不好的做法,因为它可能会将您收集的所有数据都拉给它,从而使驾驶员不知所措。

    请注意,df.rdd 将返回 RDD[Row]

  2. 通过Spark,您可以使用databricks spark-csv library:

    • Spark 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • Spark 1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. 对于 Spark 2.x,不需要 spark-csv 包,因为它包含在 Spark 中。

    df.write.format("csv").save(filepath)
    
  4. 您可以转换为本地 Pandas 数据框并使用to_csv 方法(仅限 PySpark)。

注意:解决方案 1、2 和 3 将生成 CSV 格式文件 (part-*),由 Spark 在您调用 save 时调用的底层 Hadoop API 生成。每个分区会有一个part- 文件。

【讨论】:

  • 我认为spark-csv 是首选解决方案。从头开始创建正确的 csv 行并不容易。所有方言和正确的转义都可能非常棘手。
  • 在 PySpark 中,您还可以将小表转换为 Pandas 并保存在本地。但这可能是一个 Scala 问题。
  • 如果您想将信息添加到答案@zero323,请随时这样做!
  • 你们知道是否有可能避免使用hadoopish格式并将数据存储到文件名或我选择的s3键名下的文件中,而不是使用_SUCCES和@的目录987654338@?
  • 我使用 spark-csv 发布了解决方案
【解决方案2】:

以 csv 格式将数据帧写入磁盘与从 csv 读取类似。如果您希望将结果作为一个文件,您可以使用 coalesce。

df.coalesce(1)
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("output/path")

如果您的结果是一个数组,您应该使用特定于语言的解决方案,而不是 spark 数据框 api。因为所有这些结果都返回驱动机器。

【讨论】:

    【解决方案3】:

    我有类似的问题。当我以客户端模式连接到集群时,我需要在驱动程序上写下 csv 文件。

    我想重用与 Apache Spark 相同的 CSV 解析代码以避免潜在错误。

    我检查了 spark-csv 代码,在com.databricks.spark.csv.CsvSchemaRDD 中找到了负责将数据帧转换为原始 csv 的代码RDD[String]

    遗憾的是,它使用sc.textFile 和相关方法的结尾进行了硬编码。

    我复制粘贴了该代码并删除了最后一行 sc.textFile 并直接返回了 RDD。

    我的代码:

    /*
      This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
      Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
      But in last lines of that method it's hardcoded against writing as text file -
      for our case we need RDD.
     */
    object DataframeToRawCsvRDD {
    
      val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
    
      def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
               (implicit ctx: ExecutionContext): RDD[String] = {
        val delimiter = parameters.getOrElse("delimiter", ",")
        val delimiterChar = if (delimiter.length == 1) {
          delimiter.charAt(0)
        } else {
          throw new Exception("Delimiter cannot be more than one character.")
        }
    
        val escape = parameters.getOrElse("escape", null)
        val escapeChar: Character = if (escape == null) {
          null
        } else if (escape.length == 1) {
          escape.charAt(0)
        } else {
          throw new Exception("Escape character cannot be more than one character.")
        }
    
        val quote = parameters.getOrElse("quote", "\"")
        val quoteChar: Character = if (quote == null) {
          null
        } else if (quote.length == 1) {
          quote.charAt(0)
        } else {
          throw new Exception("Quotation cannot be more than one character.")
        }
    
        val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
        val quoteMode: QuoteMode = if (quoteModeString == null) {
          null
        } else {
          QuoteMode.valueOf(quoteModeString.toUpperCase)
        }
    
        val nullValue = parameters.getOrElse("nullValue", "null")
    
        val csvFormat = defaultCsvFormat
          .withDelimiter(delimiterChar)
          .withQuote(quoteChar)
          .withEscape(escapeChar)
          .withQuoteMode(quoteMode)
          .withSkipHeaderRecord(false)
          .withNullString(nullValue)
    
        val generateHeader = parameters.getOrElse("header", "false").toBoolean
        val headerRdd = if (generateHeader) {
          ctx.sparkContext.parallelize(Seq(
            csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
          ))
        } else {
          ctx.sparkContext.emptyRDD[String]
        }
    
        val rowsRdd = dataFrame.rdd.map(row => {
          csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
        })
    
        headerRdd union rowsRdd
      }
    
    }
    

    【讨论】:

      【解决方案4】:

      我有类似的问题,我必须将数据框的内容保存到我定义的名称的 csv 文件中。 df.write("csv").save("<my-path>") 正在创建目录而不是文件。所以不得不想出以下解决方案。 大部分代码取自以下dataframe-to-csv,对逻辑进行了少量修改。

      def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
          val tmpParquetDir = "Posts.tmp.parquet"
      
          df.repartition(1).write.
              format("com.databricks.spark.csv").
              option("header", header.toString).
              option("delimiter", sep).
              save(tmpParquetDir)
      
          val dir = new File(tmpParquetDir)
          val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
          val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
          (new File(tmpTsvFile)).renameTo(new File(tsvOutput))
      
          dir.listFiles.foreach( f => f.delete )
          dir.delete
          }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-09-12
        • 2011-10-31
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多