我还不能发表评论,所以我会尝试解决这个问题。它可能仍需要修改。据我所知,您正在寻找最后一个独特的变化。所以 Val1 有 {ab -> ab -> ac, bc -> bb -> bc} 所以最终结果是 {ac, bb} 因为最后一个文件的 bc 在第一个文件中,因此不是唯一的。如果是这种情况,那么处理这个问题的最佳方法是创建一个集合并从集合中获取最后一个值。我将使用 udf 来完成这项工作
从你的例子来看:
val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ac"),(2,"bb","bc","bd"))).toDF("pk1","pk2","val1","val2")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ad"),(2,"bb","bb","bd"))).toDF("pk1","pk2","val1","val2")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","ac","ad"),(2,"bb","bc","bd"))).toDF("pk1","pk2","val1","val2")
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.UserDefinedFunction
import sqlContext.implicits._
def getChange: UserDefinedFunction =
udf((a: String, b: String, c: String) => Set(a,b,c).last)
df1
.join(df2,df1("pk1")===df2("pk1") && df1("pk2")===df2("pk2"), "inner")
.join(df3,df1("pk1")===df3("pk1") && df1("pk2")===df3("pk2"), "inner")
.select(df1("pk1"),df1("pk2"),
df1("val1").as("df1Val1"),df2("val1").as("df2Val1"),df3("val1").as("df3Val1"),
df1("val2").as("df1Val2"),df2("val2").as("df2Val2"),df3("val2").as("df3Val2"))
.withColumn("val1",getChange($"df1Val1",$"df2Val1",$"df3Val1"))
.withColumn("val2",getChange($"df1Val2",$"df2Val2",$"df3Val2"))
.select($"pk1",$"pk2",$"val1",$"val2")
.orderBy($"pk1")
.show(false)
这会产生:
+---+---+----+----+
|pk1|pk2|val1|val2|
+---+---+----+----+
|1 |aa |ac |ad |
|2 |bb |bb |bd |
+---+---+----+----+
显然,如果您使用更多列或更多数据帧,那么写出来会变得有点麻烦,但这应该可以解决您的示例
编辑:
这用于向混合中添加更多列。正如我上面所说,它有点麻烦。这将遍历每一列,直到没有留下任何内容。
require(df1.columns.sameElements(df2.columns) && df1.columns.sameElements(df3.columns),"DF Columns do not match") //this is a check so may not be needed
val cols: Array[String] = df1.columns
def getChange: UserDefinedFunction = udf((a: String, b: String, c: String) => Set(a,b,c).last)
def createFrame(cols: Array[String], df1: DataFrame, df2: DataFrame, df3:DataFrame): scala.collection.mutable.ListBuffer[DataFrame] = {
val list: scala.collection.mutable.ListBuffer[DataFrame] = new scala.collection.mutable.ListBuffer[DataFrame]()
val keys = cols.slice(0,2) //get the keys
val columns = cols.slice(2, cols.length).toSeq //get the columns to use
def helper(columns: Seq[String]): scala.collection.mutable.ListBuffer[DataFrame] = {
if(columns.isEmpty) list
else {
list += df1
.join(df2, df1.col(keys(0)) === df2.col(keys(0)) && df1.col(keys(1)) === df2.col(keys(1)), "inner")
.join(df3, df1.col(keys(0)) === df3.col(keys(0)) && df1.col(keys(1)) === df3.col(keys(1)), "inner")
.select(df1.col(keys(0)), df1.col(keys(1)),
getChange(df1.col(columns.head), df2.col(columns.head), df3.col(columns.head)).as(columns.head))
helper(columns.tail) //use tail recursion
}
}
helper(columns)
}
val list: scala.collection.mutable.ListBuffer[DataFrame] = createFrame(cols, df1, df2, df3)
list.reduce((a,b) =>
a
.join(b,a(cols.head)===b(cols.head) && a(cols(1))===b(cols(1)),"inner")
.drop(b(cols.head))
.drop(b(cols(1))))
.select(cols.head, cols.tail: _*)
.orderBy(cols.head)
.show
一个包含 3 个值列的示例,然后将它们传递到上面的代码中:
val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ac","ad"),(2,"bb","bc","bd","bc"))).toDF("pk1","pk2","val1","val2","val3")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ad","ae"),(2,"bb","bb","bd","bf"))).toDF("pk1","pk2","val1","val2","val3")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","ac","ad","ae"),(2,"bb","bc","bd","bg"))).toDF("pk1","pk2","val1","val2","val3")
产生以下数据帧:
运行上面的代码产生:
//output
+---+---+----+----+----+
|pk1|pk2|val1|val2|val3|
+---+---+----+----+----+
| 1| aa| ac| ad| ae|
| 2| bb| bb| bd| bg|
+---+---+----+----+----+
可能还有一种更有效的方法可以做到这一点,但这不是我的想法。
编辑2
要使用任意数量的键来执行此操作,您可以执行以下操作。您需要在开始时定义键的数量。这也可能被清理干净。我已经让它与 4/5 键一起使用,但你也应该运行一些测试,但它应该可以工作:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction
val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ab","ac","ad"),(2,"bb","d","e","bc","bd","bc"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ab","ad","ae"),(2,"bb","d","e","bb","bd","bf"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ac","ad","ae"),(2,"bb","d","e","bc","bd","bg"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
require(df1.columns.sameElements(df2.columns) && df1.columns.sameElements(df3.columns),"DF Columns do not match")
val cols: Array[String] = df1.columns
def getChange: UserDefinedFunction = udf((a: String, b: String, c: String) => Set(a,b,c).last)
def createFrame(cols: Array[String], df1: DataFrame, df2: DataFrame, df3:DataFrame): scala.collection.mutable.ListBuffer[DataFrame] = {
val list: scala.collection.mutable.ListBuffer[DataFrame] = new scala.collection.mutable.ListBuffer[DataFrame]()
val keys = cols.slice(0,4)//get the keys
val columns = cols.slice(4, cols.length).toSeq //get the columns to use
def helper(columns: Seq[String]): scala.collection.mutable.ListBuffer[DataFrame] = {
if(columns.isEmpty) list
else {
list += df1
.join(df2, Seq(keys :_*), "inner")
.join(df3, Seq(keys :_*), "inner")
.withColumn(columns.head + "Out", getChange(df1.col(columns.head), df2.col(columns.head), df3.col(columns.head)))
.select(col(columns.head + "Out").as(columns.head) +: keys.map(x => df1.col(x)) : _*)
helper(columns.tail)
}
}
helper(columns)
}
val list: scala.collection.mutable.ListBuffer[DataFrame] = createFrame(cols, df1, df2, df3)
list.foreach(a => a.show(false))
val keys=cols.slice(0,4)
list.reduce((a,b) =>
a.alias("a").join(b.alias("b"),Seq(keys :_*),"inner")
.select("a.*","b." + b.columns.head))
.orderBy(cols.head)
.show(false)
这会产生:
+---+---+---+---+----+----+----+
|pk1|pk2|pk3|pk4|val1|val2|val3|
+---+---+---+---+----+----+----+
|1 |aa |c |d |ac |ad |ae |
|2 |bb |d |e |bb |bd |bg |
+---+---+---+---+----+----+----+