【问题标题】:Transform row to nested JSON in Spark在 Spark 中将行转换为嵌套的 JSON
【发布时间】:2019-09-28 15:25:01
【问题描述】:

我正在尝试将一行转换为 Spark 中的嵌套 JSON 结构。

从逗号分隔值文件中加载行:

identifier,timestamp,x,y
2,         456,      1,x
1,         456,      1,y
1,         123,      0,x
1,         789,      0,z

行应转换为以下 JSON 格式(按标识符分组并按时间戳排序):

{"identifier":"1","events":[{"timestamp":"123","properties":{"x":"0","y":"x"}},{"timestamp":"456","properties":{"x":"1","y":"y"}},{"timestamp":"789","properties":{"x":"0","y":"z"}}]}

{"identifier":"2","events":[{"timestamp":"456","properties":{"x":"0","y":"z"}}]}

截至目前,我已成功将数据转换为...

{"identifier":"1","collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)":[{"timestamp":"123","properties":{"x":"0","y":"x"}},{"timestamp":"456","properties":{"x":"1","y":"y"}},{"timestamp":"789","properties":{"x":"0","y":"z"}}]}

{"identifier":"2","collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)":[{"timestamp":"456","properties":{"x":"0","y":"z"}}]}

使用以下代码:

public static void main(final String[] args) {
    final Column properties = struct(col("x").as("x"), col("y").as("y")).as("properties");
    final Column event = struct(col("timestamp").as("timestamp"), properties).as("events");

    final SparkSession sparkSession = SparkSession.builder().getOrCreate();
    final Dataset<Row> events = sparkSession.read().option("header", "true").csv("/input/events").sort(col("identifier").asc(), col("timestamp").asc());
    Dataset<String> groupedEvents = events.groupBy("identifier").agg(collect_list(event)).toJSON();
    groupedEvents.write().text("/output/events");
    sparkSession.stop();
}

但是,由此产生的转换还包括...

"collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)

我希望它等于“事件”。

如何产生所描述的转换?

【问题讨论】:

    标签: java json apache-spark


    【解决方案1】:

    最终使用以下方法管理自己:

    public static void main(final String[] args) {
        final SparkSession sparkSession = SparkSession.builder().getOrCreate();
        final Dataset<Row> events = sparkSession.read().option("header", "true").csv("/input/events");
        events.createOrReplaceTempView("groupedevents");
        final Dataset<String> groupedEvents = sparkSession.sql("select identifier, sort_array(collect_list(struct(timestamp, struct(x, y) as properties))) as events from groupedevents group by identifier").toJSON();
        groupedEvents.write().text("/output/events");
        sparkSession.stop();
    }
    

    【讨论】:

      猜你喜欢
      • 2018-10-31
      • 2020-04-04
      • 2019-09-26
      • 2018-09-25
      • 1970-01-01
      • 1970-01-01
      • 2018-12-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多