看起来@Steven 的答案在逻辑上很好,但如果您的数据没有很多相交元素(即完全外连接将产生巨大的数据集),则可能会遇到问题。您也在使用 DataFrames,因此转换为 RDDs 然后再转换回 DataFrames 对于可以使用 DataFrames API 完成的任务来说似乎是多余的。我将在下面描述如何做到这一点。
让我们从一些示例数据开始(取自您的示例):
val rdd1 = sc.parallelize(Array((1,1,"item A",10), (1,2,"item b",12), (1,3,"item c",12)))
val rdd2 = sc.parallelize(Array((1,2,"item b",50), (1,4,"item c",12), (1,5,"item c",12)))
接下来,我们可以将它们转换为不同列别名下的 DataFrame。我们在df1和df2这里使用了不同的别名,因为当我们最终加入这两个DataFrame时,后续的选择可以更容易编写(如果有办法在加入后识别列的来源,这是不必要的) .请注意,两个 DataFrame 的并集包含您要过滤的行。
val df1 = rdd1.toDF("id_location", "id_item", "name", "price")
val df2 = rdd2.toDF("id_location_2", "id_item_2", "name_2", "price_2")
// df1.unionAll(df2).show()
// +-----------+-------+------+-----+
// |id_location|id_item| name|price|
// +-----------+-------+------+-----+
// | 1| 1|item A| 10|
// | 1| 2|item b| 12|
// | 1| 3|item c| 12|
// | 1| 2|item b| 50|
// | 1| 4|item c| 12|
// | 1| 5|item c| 12|
// +-----------+-------+------+-----+
在这里,我们首先在键上连接两个 DataFrame,即 df1 和 df2 的前两个元素。然后,我们通过选择行(基本上来自df1)创建另一个DataFrame,其中存在来自df2 的行具有相同的连接键。之后,我们在df1 上运行一个except 来删除之前创建的DataFrame 中的所有行。这可以看作是一种补充,因为我们基本上所做的是从df1 中删除所有行,其中df2 中存在相同的("id_location", "id_item")。最后,我们将补码与df2 结合在一起以生成输出数据帧。
val df_joined = df1.join(df2, (df1("id_location") === df2("id_location_2")) && (df1("id_item") === df2("id_item_2")))
val df1_common_keyed = df_joined.select($"id_location", $"id_item", $"name", $"price")
val df1_complement = df1.except(df1_common_keyed)
val df_union = df1_complement.unionAll(df2)
// df_union.show()
// +-----------+-------+------+-----+
// |id_location|id_item| name|price|
// +-----------+-------+------+-----+
// | 1| 3|item c| 12|
// | 1| 1|item A| 10|
// | 1| 2|item b| 50|
// | 1| 4|item c| 12|
// | 1| 5|item c| 12|
// +-----------+-------+------+-----+
再次,就像@Steven 建议的那样,您可以通过将 DataFrames 转换为 RDD 并使用它运行来使用 RDD API。如果这是您想要做的,以下是使用subtractByKey() 和上面的输入 RDD 完成您想要的另一种方法:
val keyed1 = rdd1.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
val keyed2 = rdd2.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
val unionRDD = keyed1.subtractByKey(keyed2).values.union(rdd2)
// unionRDD.collect().foreach(println)
// (1,1,item A,10)
// (1,3,item c,12)
// (1,2,item b,50)
// (1,4,item c,12)
// (1,5,item c,12)