【问题标题】:Spark SQL removing white spacesSpark SQL 删除空格
【发布时间】:2018-04-11 18:38:00
【问题描述】:

我有一个简单的 Spark 程序,它读取 JSON 文件并发出 CSV 文件。在 JSON 数据中,值包含前导和尾随空格,当我发出 CSV 时,前导和尾随空格消失了。有没有办法可以保留空间。我尝试了很多选项,例如 ignoreTrailingWhiteSpace 、 ignoreLeadingWhiteSpace 但没有运气

输入.json

{"key" : "k1", "value1": "Good String", "value2": "Good String"}
{"key" : "k1", "value1": "With Spaces      ", "value2": "With Spaces      "}
{"key" : "k1", "value1": "with tab\t", "value2": "with tab\t"}

输出.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces,With Spaces
,k1,with tab,with tab

expected.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces      ,With Spaces      
,k1,with tab\t,with tab\t

我的代码:

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession
            .builder()
            .appName(TestSpark.class.getName())
            .master("local[1]").getOrCreate();

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    System.out.println("Spark context established");

    List<StructField> kvFields = new ArrayList<>();
    kvFields.add(DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("key", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value1", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value2", DataTypes.StringType, true));
    StructType employeeSchema = DataTypes.createStructType(kvFields);

    Dataset<Row> dataset =
            sparkSession.read()
                    .option("inferSchema", false)
                    .format("json")
                    .schema(employeeSchema)
                    .load("D:\\dev\\workspace\\java\\simple-kafka\\key_value.json");

    dataset.createOrReplaceTempView("sourceView");
    sqlCtx.sql("select * from sourceView")
            .write()
            .option("header", true)
            .format("csv")
            .save("D:\\dev\\workspace\\java\\simple-kafka\\output\\" + UUID.randomUUID().toString());
    sparkSession.close();
}

更新

添加了 POM 依赖项

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.22</version>
    </dependency>
</dependencies>

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-dataframe spark-streaming apache-spark-mllib


    【解决方案1】:

    对于 Apache Spark 2.2+,您只需使用 "ignoreLeadingWhiteSpace""ignoreTrailingWhiteSpace" 选项(请参阅@Roberto Congiu 的答案中的详细信息)

    我猜这应该是较低 Apache Spark 版本的默认行为 - 不过我不确定。

    对于 Apache Spark 1.3+,您可以使用 "univocity" parserLib 来明确指定它:

    df.write
      .option("parserLib","univocity")
      .option("ignoreLeadingWhiteSpace","false")
      .option("ignoreTrailingWhiteSpace","false")
      .format("csv")
    

    旧的“不正确”答案 - 显示如何去除整个数据框(在所有列中)中的前导和尾随空格和制表符

    这是一个 scala 解决方案:

    来源 DF:

    scala> val df = spark.read.json("file:///temp/a.json")
    df: org.apache.spark.sql.DataFrame = [key: string, value1: string ... 1 more field]
    
    scala> df.show
    +---+-----------------+-----------------+
    |key|           value1|           value2|
    +---+-----------------+-----------------+
    | k1|      Good String|      Good String|
    | k1|With Spaces      |With Spaces      |
    | k1|        with tab   |        with tab       |
    +---+-----------------+-----------------+
    

    解决方案:

    import org.apache.spark.sql.functions._
    
    val df2 = df.select(df.columns.map(c => regexp_replace(col(c),"(^\\s+|\\s+$)","").alias(c)):_*)
    

    结果:

    scala> df2.show
    +---+----------+----------+
    |key|    value1|    value2|
    +---+----------+----------+
    | k1|GoodString|GoodString|
    | k1|WithSpaces|WithSpaces|
    | k1|   withtab|   withtab|
    +---+----------+----------+
    

    PS 在 Java Spark 中应该非常相似...

    【讨论】:

    • 我不想删除空格.. 我想保留 JSON 中的空格
    • @Manjesh,哦,我明白了。罗伯托的解决方案应该有效 - 你试过了吗?
    【解决方案2】:

    默认情况下,CSV 编写器会修剪前导和尾随空格。你可以关闭它

       sqlCtx.sql("select * from sourceView").write.
           option("header", true).
           option("ignoreLeadingWhiteSpace",false). // you need this
           option("ignoreTrailingWhiteSpace",false). // and this
           format("csv").save("/my/file/location")
    

    这对我有用。如果它对您不起作用,您能否发布您尝试过的内容,以及您使用的是哪个 spark 版本?如果我没记错的话,他们就在去年推出了这个功能。

    【讨论】:

    • 该代码适用于 Spark 2.2+,但不适用于 2.1。我已经添加了 POM 依赖项
    【解决方案3】:
    // hope these two options can solve your question
    spark.read.json(inputPath).write
        .option("ignoreLeadingWhiteSpace",false)
        .option("ignoreTrailingWhiteSpace", false)
        .csv(outputPath)
    

    您可以查看下面的链接以获取更多信息

    https://issues.apache.org/jira/browse/SPARK-18579

    https://github.com/apache/spark/pull/17310

    谢谢

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多