【问题标题】:Spark SQL expand array to multiple columnsSpark SQL 将数组扩展为多列
【发布时间】:2017-05-27 03:20:07
【问题描述】:

我正在为来自 S3 中的 oracle 源的每个行更新存储 json 消息。 json结构如下

{
    "tableName": "ORDER",
    "action": "UPDATE",
    "timeStamp": "2016-09-04 20:05:08.000000",
    "uniqueIdentifier": "31200477027942016-09-05 20:05:08.000000",
    "columnList": [{
        "columnName": "ORDER_NO",
        "newValue": "31033045",
        "oldValue": ""
    }, {
        "columnName": "ORDER_TYPE",
        "newValue": "N/B",
        "oldValue": ""
    }]
}

我正在使用 spark sql 根据唯一标识符的最大值查找每个键的最新记录。 columnList 是一个包含表列列表的数组。我想加入多个表并获取最新的记录。 如何将一个表的 json 数组中的列与另一表中的列连接起来。有没有办法将 json 数组分解为多列。例如,上面的 json 将 ORDER_NO 作为一列, ORDER_TYPE 作为另一列。如何根据 columnName 字段创建具有多列的数据框 例如:新的 RDD 应该有列(tableName、action、timeStamp、uniqueIdentifier、ORDER_NO、ORDER_NO) ORDER_NO 和 ORDER_NO 字段的值应从 json 中的 newValue 字段映射。

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    已通过使用 RDD api 以编程方式创建架构找到了解决方案

      Dataset<Row> dataFrame = spark.read().json(inputPath);
        dataFrame.printSchema();
        JavaRDD<Row> rdd = dataFrame.toJavaRDD();
        SchemaBuilder schemaBuilder = new SchemaBuilder();
        // get the schema column names in appended format
        String columnNames = schemaBuilder.populateColumnSchema(rdd.first(), dataFrame.columns());
    

    SchemaBuilder 是一个创建的自定义类,它采用 rdd 详细信息并返回分隔符分隔的列名。 然后使用 RowFactory.create 调用,将 json 值映射到模式。 文档参考http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-01
      • 2022-01-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-05
      相关资源
      最近更新 更多