【问题标题】:Evolving a schema with Spark DataFrame使用 Spark DataFrame 演化模式
【发布时间】:2016-02-21 18:54:07
【问题描述】:

我正在使用一个 Spark 数据框,它可以从三个不同的架构版本之一加载数据:

// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }

我可以通过检查架构是否包含字段“C”以及是否不向数据框添加新列来处理附加的“C”。但是我不知道如何为子对象创建一个字段。

public void evolvingSchema() {
    String versionOne = "{ \"A\": {\"B\": 1 } }";
    String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
    String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
    if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
        df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
    }
    // Not sure what to put here. The fieldNames does not contain the "A.D"

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}

【问题讨论】:

    标签: java apache-spark dataframe apache-spark-sql


    【解决方案1】:

    JSON 源不太适合具有不断变化的架构的数据(Avro 或 Parquet 怎么样),但简单的解决方案是对所有源使用相同的架构并使新字段可选/可为空:

    import org.apache.spark.sql.types.{StructType, StructField, LongType}
    
    val schema = StructType(Seq(
      StructField("A", StructType(Seq(
        StructField("B", LongType, true), 
        StructField("D", LongType, true)
      )), true),
      StructField("C", LongType, true)))
    

    您可以像这样将schema 传递给DataFrameReader

    val rddV1 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 } }"))
    val df1 = sqlContext.read.schema(schema).json(rddV1)
    
    val rddV2 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 }, \"C\": 2 }"))
    val df2 = sqlContext.read.schema(schema).json(rddV2)
    
    val rddV3 = sc.parallelize(Seq("{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }"))
    val df3 = sqlContext.read.schema(schema).json(rddV3)
    

    您将获得独立于变体的一致结构:

    require(df1.schema == df2.schema && df2.schema == df3.schema)
    

    缺少的列自动设置为null:

    df1.printSchema
    // root
    //  |-- A: struct (nullable = true)
    //  |    |-- B: long (nullable = true)
    //  |    |-- D: long (nullable = true)
    //  |-- C: long (nullable = true)
    
    df1.show
    // +--------+----+
    // |       A|   C|
    // +--------+----+
    // |[1,null]|null|
    // +--------+----+
    
    df2.show
    // +--------+---+
    // |       A|  C|
    // +--------+---+
    // |[1,null]|  2|
    // +--------+---+
    
    df3.show
    // +-----+---+
    // |    A|  C|
    // +-----+---+
    // |[1,3]|  2|
    // +-----+---+
    

    注意

    此解决方案取决于数据源。它可能适用于其他来源,也可能不适用于其他来源,或even result in malformed records

    【讨论】:

    • 为什么说 JSON 不适合不断发展的模式?
    • @mlk 我们与不断发展的模式和 JSON 的主要问题是各种 JSON 客户端可能会做一些意想不到的事情,例如将您期望作为空字符串的空部分呈现为空字符串(即“”) .这真的会打乱您的架构管理......我想 zero323 也有类似的担忧
    • 在 Ewan 的评论之上,JSON 既不自描述也不支持模式。您当然可以为 JSON 文档使用或创建自定义超媒体格式,但没有语义的一部分。当我们处理 JSONL 时,它变得特别烦人。如果没有通过整个文件的锣,就不可能推断出架构。此外,如果数据格式不正确,这是我们在运行时遇到的问题。
    • 正如我们所说,如果您读取两个不同分区的 parquet 文件夹,这将不适用于 spark 2.1.0。即使您在阅读之前指定架构,列顺序也会混乱
    • 作为参考,我处理不断发展的模式的方式是这样的:我公开并使用私有 Structype.mergeSchema 手动合并来自不同来源的模式(从 ParquetFileFormat.mergeSchemasInParallel 读取文件的子集) ,然后我读取给出显式模式的数据帧,最后使用 select(col: _*) 技巧对列进行重新排序。只有这样,我才执行联合。
    【解决方案2】:

    zero323 已经回答了这个问题,但在 Scala 中。这是同样的事情,但在 Java 中。

    public void evolvingSchema() {
        String versionOne = "{ \"A\": {\"B\": 1 } }";
        String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
        String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";
    
        process(spark.getContext(), "1", versionOne);
        process(spark.getContext(), "2", versionTwo);
        process(spark.getContext(), "2", versionThree);
    }
    
    private static void process(JavaSparkContext sc, String version, String data) {
        StructType schema = DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("A",
                        DataTypes.createStructType(Arrays.asList(
                                DataTypes.createStructField("B", DataTypes.LongType, true),
                        DataTypes.createStructField("D", DataTypes.LongType, true))), true),
                DataTypes.createStructField("C", DataTypes.LongType, true)));
    
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data)));
    
        try {
            df.select("C").collect();
        } catch(Exception e) {
            System.out.println("Failed to C for " + version);
        }
        try {
            df.select("A.D").collect();
        } catch(Exception e) {
            System.out.println("Failed to A.D for " + version);
        }
    }
    

    【讨论】:

    • 谢谢。 Java api 与更常见的 scala 完全不同
    • 谢谢,对java感兴趣的开发者少之又少。
    猜你喜欢
    • 1970-01-01
    • 2019-02-16
    • 2017-12-31
    • 2021-01-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多