【发布时间】:2019-09-28 15:25:01
【问题描述】:
我正在尝试将一行转换为 Spark 中的嵌套 JSON 结构。
从逗号分隔值文件中加载行:
identifier,timestamp,x,y
2, 456, 1,x
1, 456, 1,y
1, 123, 0,x
1, 789, 0,z
行应转换为以下 JSON 格式(按标识符分组并按时间戳排序):
{"identifier":"1","events":[{"timestamp":"123","properties":{"x":"0","y":"x"}},{"timestamp":"456","properties":{"x":"1","y":"y"}},{"timestamp":"789","properties":{"x":"0","y":"z"}}]}
{"identifier":"2","events":[{"timestamp":"456","properties":{"x":"0","y":"z"}}]}
截至目前,我已成功将数据转换为...
{"identifier":"1","collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)":[{"timestamp":"123","properties":{"x":"0","y":"x"}},{"timestamp":"456","properties":{"x":"1","y":"y"}},{"timestamp":"789","properties":{"x":"0","y":"z"}}]}
{"identifier":"2","collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)":[{"timestamp":"456","properties":{"x":"0","y":"z"}}]}
使用以下代码:
public static void main(final String[] args) {
final Column properties = struct(col("x").as("x"), col("y").as("y")).as("properties");
final Column event = struct(col("timestamp").as("timestamp"), properties).as("events");
final SparkSession sparkSession = SparkSession.builder().getOrCreate();
final Dataset<Row> events = sparkSession.read().option("header", "true").csv("/input/events").sort(col("identifier").asc(), col("timestamp").asc());
Dataset<String> groupedEvents = events.groupBy("identifier").agg(collect_list(event)).toJSON();
groupedEvents.write().text("/output/events");
sparkSession.stop();
}
但是,由此产生的转换还包括...
"collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)
我希望它等于“事件”。
如何产生所描述的转换?
【问题讨论】:
标签: java json apache-spark