【发布时间】:2018-11-14 02:55:55
【问题描述】:
我有一个带有下一个信息的 Dataframe df:
id json_data
1 {a: "1", b: "2"}
1 {a: "1", b: "3"}
1 {a: "1", b: "4"}
2 {a: "1", b: "2"}
2 {a: "1", b: "6"}
我需要下一个最终结果:
id json_data
1 [{a: "1", b: "2"},{a: "1", b: "3"},{a: "1", b: "4"}]
2 [{a: "1", b: "2"},{a: "1", b: "6"}]
我尝试了两种不同的方法,分别使用 Window 函数和 groupBy。通过这两种方法,我都得到了想要的结果。
1º 接近:
var user_window = Window.partitionBy("id").orderBy("id")
val df2 = df.withColumn("json_data",
collect_list($"json_data").over(user_window))
.withColumn("rank", row_number().over(user_window))
.where("rank = 1")
2º 方法:
val df2 = df.groupBy(df("id")).agg(collect_list($"json_data").as("json_data"))
通过这两种方法,我获得了相同的性能。但是阅读有关 Spark 的文档,似乎这两种方法都效率不高,因为具有相同键的行将需要穿过集群(混洗)才能聚集在一起。我正在展示一个小例子,因为在生产中我有大量数据。做组或使用窗口功能需要很长时间。
有什么办法可以做到吗?
【问题讨论】:
-
您要问的基本上是如何比执行 group by 更有效地执行“group by”。我不确定我们能不能帮助你,除非你告诉我们更多关于你的数据(它是否不平衡?等等)。但是,如果您执行“分组依据”作为实现其他目的的中间操作,如果您告诉我们它是什么,我们或许能够帮助您更有效地实现该最终目的。
-
我可以更改源表,然后您可以使用
bucketing... 如果该表是由id分桶的,那么如果您执行groupBy($"id",则不会发生洗牌) -
我建议您在创建数据框时使用自定义分区,以便具有相同 id 的数据进入相同的执行程序。那么groupby应该不错
-
@RaphaelRoth 我想知道你的意思是不是这样:df.write.bucketBy(10, "id").saveAsTable("df_table")。
-
@sylvinho81 是的。我你红了这样一张表,做groupBy时物理计划中应该没有“交换”(洗牌)
标签: scala apache-spark grouping shuffle window-functions