【问题标题】:Writing CSV file using Spark and java - handling empty values and quotes使用 Spark 和 java 编写 CSV 文件 - 处理空值和引号
【发布时间】:2020-06-10 14:23:58
【问题描述】:

初始数据在 Dataset 中,我正在尝试写入管道分隔文件,我希望将每个非空单元格和非空值放在引号中。空值或 null 值不应包含引号

result.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("nullValue", "")
            .option("quoteAll", "false")
            .csv(Location);

预期输出:

"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"

电流输出:

London||UK
Delhi|India
Moscow|Russia

如果我将“quoteAll”更改为“true”,我得到的输出是:

"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Spark 版本是 2.3,java 版本是 java 8

【问题讨论】:

  • 通常,不包含管道或引号的值不需要引号。这就是 CSV 通常的工作方式。您为什么希望引用这些值?
  • "quoteAll" to "true" 输出是正确的。 London||UK|| 之间是一个单元格。所以你应该用""标记它,如果你想使用quote(成为一个有效的csv)。结果"London"|""|"UK"
  • @RealSkeptic 以前的代码版本在 Spark 1.6 中,它就是这样工作的。现在,当升级到 2.3 时,它会为所有内容或任何内容都提供引号。但是业务用户希望它采用以前的格式,并且不希望进行任何更改。我想检查手动将引号添加到所有非空值的可能性,然后将“quoteAll”设置为“false”
  • @KunLun 我不希望空单元格用引号引起来。我希望没有引号的空单元格和带有值的单元格在引号中。
  • 这篇文章有你要找的一切:stackoverflow.com/questions/36248206/…

标签: java csv apache-spark java-8 apache-spark-2.3


【解决方案1】:

编辑和警告:没有看到 java 标签。这是 Scala 解决方案,它使用foldLeft 作为循环遍历所有列。如果将其替换为 Java 友好循环,则一切都应该按原样工作。稍后我会尝试回顾一下。

程序化解决方案可以是

val columns = result.columns
val randomColumnName = "RND"

val result2 = columns.foldLeft(result) { (data, column) =>
data
  .withColumnRenamed(column, randomColumnName)
  .withColumn(column,
    when(col(randomColumnName).isNull, "")
      .otherwise(concat(lit("\""), col(randomColumnName), lit("\"")))
  )
  .drop(randomColumnName)
}

这将生成带有" 的字符串,并将空字符串写入空值。如果您需要保留空值,请保留它们。

那就写下来吧:

result2.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("quoteAll", "false")
            .csv(Location);

【讨论】:

  • 我特别在寻找java而不是scala的解决方案。
  • 我从未将 Java 与 Spark 一起使用,但唯一特定于 Scala 的应该是 foldLeft,可以用不同的循环替换。其余的答案应该是可靠的。或者至少是逻辑。
【解决方案2】:

Java 答案。 CSV 转义不仅仅是在周围添加 " 符号。您应该在字符串中处理 "。因此,让我们使用 StringEscapeUtils 并定义将调用它的 UDF。然后只需将 UDF 应用于每一列。

import org.apache.commons.text.StringEscapeUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import java.util.Arrays;

public class Test {

    void test(Dataset<Row> result, String Location) {
        // define UDF
        UserDefinedFunction escape = udf(
            (String str) -> str.isEmpty()?"":StringEscapeUtils.escapeCsv(str), DataTypes.StringType
        );
        // call udf for each column
        Column columns[] = Arrays.stream(result.schema().fieldNames())
                .map(f -> escape.apply(col(f)).as(f))
                .toArray(Column[]::new);

         // save the result
        result.select(columns)
                .coalesce(1).write()
                .option("delimiter", "|")
                .option("header", "true")
                .option("nullValue", "")
                .option("quoteAll", "false")
                .csv(Location);
    }
}

旁注:coalesce(1) 是一个错误的调用。它收集一个执行者的所有数据。您可以在生产环境中为庞大的数据集获取 executor OOM。

【讨论】:

  • 谢谢,我刚刚写完一个类似的 udf,但在遍历每一列时遇到了困难。我正在向新数据集添加列并执行连接操作。这当然更有效,更好。只是想检查是否有办法获取标题的引号。
  • 更改此行:escape.apply(col(f)).as(f))escape.apply(col(f)).as(StringEscapeUtils.escapeCsv(f))) 以引用标题
  • 当实际数据包含分隔符时,我仍然面临问题,引号没有按预期出现,并且正在生成额外的空值。你能建议如何处理它
  • 你能举个例子吗?
【解决方案3】:

这当然不是一个有效的答案,我正在根据 Artem Aliev 给出的答案对其进行修改,但认为它对少数人有用,因此发布此答案

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;<br/>
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;<br/>
public class Quotes {<br/>
    private static final String DELIMITER = "|";
    private static final String Location = "Give location here";

    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession.builder() 
                .master("local") 
                .appName("Spark Session") 
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> result = sparkSession.read()
                .option("header", "true")
                .option("delimiter",DELIMITER)
                .csv("Sample file to read"); //Give the details of file to read here

      UserDefinedFunction udfQuotesNonNull = udf(
        (String abc) -> (abc!=null? "\""+abc+"\"":abc),DataTypes.StringType
      );

      result = result.withColumn("ind_val", monotonically_increasing_id()); //inducing a new column to be used for join as there is no identity column in source dataset


      Dataset<Row> dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing temporary results
      Dataset<Row> dataset = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")));  //Dataset used for storing output

      String[] str = result.schema().fieldNames();
      dataset1.show();
      for(int j=0; j<str.length-1;j++)
      {
        dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")),(udfQuotesNonNull.apply(col(str[j]).cast("string")).alias("\""+str[j]+"\""))); 
        dataset=dataset.join(dataset1,"ind_val"); //Joining based on induced column
      }
      result = dataset.drop("ind_val");

      result.coalesce(1).write()
      .option("delimiter", DELIMITER)
      .option("header", "true")
      .option("quoteAll", "false")
      .option("nullValue", null)
      .option("quote", "\u0000") 
      .option("spark.sql.sources.writeJobUUID", false)
      .csv(Location);
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-05-09
    • 1970-01-01
    • 2017-01-04
    • 2015-10-18
    相关资源
    最近更新 更多