【问题标题】:Spark dataframe databricks csv appends extra double quotesSpark数据框databricks csv附加额外的双引号
【发布时间】:2017-06-07 13:51:45
【问题描述】:

似乎当我在spark sql 中的dataframe 上应用CONCAT 并将dataframe 作为csv 文件存储在HDFS 位置时,会在concat 列中添加额外的双引号单独在输出文件中。

当我应用显示时不会添加这个双引号。只有当我将 dataframe 存储为 csv 文件时才会添加这个双引号

看来我需要删除在将dataframe 保存为 csv 文件时添加的额外双引号。

我正在使用com.databricks:spark-csv_2.10:1.1.0 jar

Spark 版本为 1.5.0-cdh5.5.1

输入:

 campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,    1
 campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,    2

预期输出:

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,     campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

火花代码:

  object campaignResultsMergerETL extends BaseETL {

  val now  = ApplicationUtil.getCurrentTimeStamp()
  val conf = new Configuration()
  val fs  = FileSystem.get(conf)
  val log = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
    //---------------------
    code for sqlContext Initialization 
    //---------------------
    val campaignResultsDF  = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
    campaignResultsDF.registerTempTable("campaign_results")
    val campaignGroupedDF =  sqlContext.sql(
   """
    |SELECT campaign_file_name,
    |campaign_name,
    |tracker_id,
    |SUM(campaign_measure) AS campaign_measure
    |FROM campaign_results
    |GROUP BY campaign_file_name,campaign_name,tracker_id
  """.stripMargin)

    campaignGroupedDF.registerTempTable("campaign_results_full")

    val campaignMergedDF =  sqlContext.sql(
  s"""
    |SELECT campaign_file_name,
    |tracker_id,
    |CONCAT(campaign_name,'\"=\"' ,campaign_measure),
    |"$now" AS audit_timestamp
    |FROM campaign_results_full
  """.stripMargin)

   campaignMergedDF.show(20)
   saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)

   }


    def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
    {
       log.info("saveAsCSVFile method started")
       if (fs.exists(new Path(hdfs_output_loc))){
          fs.delete(new Path(hdfs_output_loc), true)
       }
     campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
       log.info("saveAsCSVFile method ended")
    }

 }

campaignMergedDF.show(20) 的结果正确且工作正常。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

saveAsCSVFiles 的结果:这是不正确的。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   "campaign_name_1""=""1",  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   "campaign_name_1""=""2",  2017-06-06 17:09:31

有人可以帮我解决这个问题吗?

【问题讨论】:

    标签: apache-spark apache-spark-sql databricks


    【解决方案1】:

    当你使用时

    write.format("com.databricks.spark.csv").save(hdfs_output_loc)
    

    为了将包含 " 的文本写入 csv 文件,您会遇到问题,因为 spark-csv

    " 符号定义为默认引用

    将默认引号从 " 替换为其他内容(例如 NULL)应该允许您按原样将 " 写入文件。

    write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)
    

    说明:

    您使用的是默认 spark-csv:

    • 转义值为\
    • quote 值为"

    spark-csv doc

    • 引号:默认情况下引号字符是“,但可以设置为任何字符。引号内的分隔符被忽略
    • 转义:默认转义字符为\,但可以设置为任何字符。转义的引号字符被忽略

    This answer 建议如下:

    关闭双引号字符默认转义的方法 (") 与反斜杠字符 () - 即避免所有人转义 完全字符,您必须添加一个 .option() 方法调用 .write() 方法调用后的正确参数。的目标 option() 方法调用是更改 csv() 方法“查找”的方式 “引号”字符的实例,因为它正在发出内容。到 这样做,您必须更改“引用”实际含义的默认值; 即改变从双引号字符寻找的字符 (") 转换为 Unicode "\u0000" 字符(本质上提供 Unicode NUL 字符假设它永远不会出现在文档中)。

    【讨论】:

    • @SurenderRaja - 太棒了! :-)
    猜你喜欢
    • 2020-12-28
    • 1970-01-01
    • 2019-10-07
    • 2016-09-05
    • 1970-01-01
    • 1970-01-01
    • 2013-03-26
    • 2014-10-04
    • 2016-09-22
    相关资源
    最近更新 更多