【问题标题】:spark scala dataframe merge multiple dataframesspark scala数据框合并多个数据框
【发布时间】:2017-07-05 01:48:28
【问题描述】:

我有三个文件进来,

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ab|  ac|
## |  2| bb|  bc|  bd|
## +---+----+----+---+

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ab|  ad|
## |  2| bb|  bb|  bd|
## +---+----+----+---+

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ac|  ad|
## |  2| bb|  bc|  bd|
## +---+----+----+---+

我需要比较前两个文件(我将其作为数据框读取)并仅识别更改,然后与第三个文件合并,所以我的输出应该是,

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ac|  ad|
## |  2| bb|  bb|  bd|
## +---+----+----+---+

如何只选择已更改的列?并更新另一个数据框?

【问题讨论】:

  • 我认为您需要更具体一些(存在歧义),但您是否尝试过join?你可以在任何条件下加入(甚至是!=)。
  • 我可以知道什么歧义吗?我可以加入 pks,但这只会回归一切吗?我的意思是 df1 在 df1.pk1=df2.pk1 和 df1.pk2=df2.pk2 上加入 df2?这是我应该加入的方式,这很好,但要获得唯一修改的列?例如,当我加入前 2 时,我应该只得到 pk1->1, pk2->aa, val2 > ad 和 pk1->2, pk2->bb,val1->bb
  • val1 列中的第一个数据框第二行有bc,然后同一列和同一行上的第二个数据框有bb,第三个数据框有bc。那么你的最终数据框怎么会有bb?那不应该是bc吗?
  • 请将所有 3 个数据框作为 3 个不同的文件读取。我想比较前 2 个数据帧(文件),确定是否有更改并仅更新 3 个数据帧中的更改。因此,当我比较前 2 个时,我得到 val1 作为 bb(这是一个更改),并且必须在最后一个数据帧中更新此更改,因此我的最终结果应该是 bb。

标签: scala apache-spark dataframe merge


【解决方案1】:

我还不能发表评论,所以我会尝试解决这个问题。它可能仍需要修改。据我所知,您正在寻找最后一个独特的变化。所以 Val1 有 {ab -> ab -> ac, bc -> bb -> bc} 所以最终结果是 {ac, bb} 因为最后一个文件的 bc 在第一个文件中,因此不是唯一的。如果是这种情况,那么处理这个问题的最佳方法是创建一个集合并从集合中获取最后一个值。我将使用 udf 来完成这项工作

从你的例子来看:

val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ac"),(2,"bb","bc","bd"))).toDF("pk1","pk2","val1","val2")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ad"),(2,"bb","bb","bd"))).toDF("pk1","pk2","val1","val2")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","ac","ad"),(2,"bb","bc","bd"))).toDF("pk1","pk2","val1","val2") 

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.UserDefinedFunction
import sqlContext.implicits._

def getChange: UserDefinedFunction = 
    udf((a: String, b: String, c: String) => Set(a,b,c).last)

df1
.join(df2,df1("pk1")===df2("pk1") && df1("pk2")===df2("pk2"), "inner")
.join(df3,df1("pk1")===df3("pk1") && df1("pk2")===df3("pk2"), "inner")
.select(df1("pk1"),df1("pk2"),
  df1("val1").as("df1Val1"),df2("val1").as("df2Val1"),df3("val1").as("df3Val1"),
  df1("val2").as("df1Val2"),df2("val2").as("df2Val2"),df3("val2").as("df3Val2"))
  .withColumn("val1",getChange($"df1Val1",$"df2Val1",$"df3Val1"))
  .withColumn("val2",getChange($"df1Val2",$"df2Val2",$"df3Val2"))
  .select($"pk1",$"pk2",$"val1",$"val2")
  .orderBy($"pk1")
.show(false)

这会产生:

+---+---+----+----+
|pk1|pk2|val1|val2|
+---+---+----+----+
|1  |aa |ac  |ad  |
|2  |bb |bb  |bd  |
+---+---+----+----+

显然,如果您使用更多列或更多数据帧,那么写出来会变得有点麻烦,但这应该可以解决您的示例

编辑:
这用于向混合中添加更多列。正如我上面所说,它有点麻烦。这将遍历每一列,直到没有留下任何内容。

require(df1.columns.sameElements(df2.columns) && df1.columns.sameElements(df3.columns),"DF Columns do not match") //this is a check so may not be needed

val cols: Array[String] = df1.columns

def getChange: UserDefinedFunction = udf((a: String, b: String, c: String) => Set(a,b,c).last)

def createFrame(cols: Array[String], df1: DataFrame, df2: DataFrame, df3:DataFrame): scala.collection.mutable.ListBuffer[DataFrame] = {

val list: scala.collection.mutable.ListBuffer[DataFrame] = new scala.collection.mutable.ListBuffer[DataFrame]()
val keys = cols.slice(0,2) //get the keys
val columns = cols.slice(2, cols.length).toSeq //get the columns to use

  def helper(columns: Seq[String]): scala.collection.mutable.ListBuffer[DataFrame] = {
    if(columns.isEmpty) list
    else {
      list += df1
        .join(df2, df1.col(keys(0)) === df2.col(keys(0)) && df1.col(keys(1)) === df2.col(keys(1)), "inner")
        .join(df3, df1.col(keys(0)) === df3.col(keys(0)) && df1.col(keys(1)) === df3.col(keys(1)), "inner")
        .select(df1.col(keys(0)), df1.col(keys(1)),
        getChange(df1.col(columns.head), df2.col(columns.head), df3.col(columns.head)).as(columns.head))

      helper(columns.tail) //use tail recursion
  }
}
  helper(columns)
}

val list: scala.collection.mutable.ListBuffer[DataFrame] = createFrame(cols, df1, df2, df3)

list.reduce((a,b) =>
  a
    .join(b,a(cols.head)===b(cols.head) && a(cols(1))===b(cols(1)),"inner")
    .drop(b(cols.head))
    .drop(b(cols(1))))
.select(cols.head, cols.tail: _*)
.orderBy(cols.head)
.show

一个包含 3 个值列的示例,然后将它们传递到上面的代码中:

val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ac","ad"),(2,"bb","bc","bd","bc"))).toDF("pk1","pk2","val1","val2","val3")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ad","ae"),(2,"bb","bb","bd","bf"))).toDF("pk1","pk2","val1","val2","val3")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","ac","ad","ae"),(2,"bb","bc","bd","bg"))).toDF("pk1","pk2","val1","val2","val3")

产生以下数据帧:

运行上面的代码产生:

//output
+---+---+----+----+----+
|pk1|pk2|val1|val2|val3|
+---+---+----+----+----+
|  1| aa|  ac|  ad|  ae|
|  2| bb|  bb|  bd|  bg|
+---+---+----+----+----+

可能还有一种更有效的方法可以做到这一点,但这不是我的想法。

编辑2

要使用任意数量的键来执行此操作,您可以执行以下操作。您需要在开始时定义键的数量。这也可能被清理干净。我已经让它与 4/5 键一起使用,但你也应该运行一些测试,但它应该可以工作:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction

val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ab","ac","ad"),(2,"bb","d","e","bc","bd","bc"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ab","ad","ae"),(2,"bb","d","e","bb","bd","bf"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ac","ad","ae"),(2,"bb","d","e","bc","bd","bg"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")

require(df1.columns.sameElements(df2.columns) && df1.columns.sameElements(df3.columns),"DF Columns do not match")

val cols: Array[String] = df1.columns

def getChange: UserDefinedFunction = udf((a: String, b: String, c: String) => Set(a,b,c).last)

def createFrame(cols: Array[String], df1: DataFrame, df2: DataFrame, df3:DataFrame): scala.collection.mutable.ListBuffer[DataFrame] = {

val list: scala.collection.mutable.ListBuffer[DataFrame] = new scala.collection.mutable.ListBuffer[DataFrame]()
val keys = cols.slice(0,4)//get the keys
val columns = cols.slice(4, cols.length).toSeq //get the columns to use

def helper(columns: Seq[String]): scala.collection.mutable.ListBuffer[DataFrame] = {

  if(columns.isEmpty) list
  else {
    list += df1
      .join(df2, Seq(keys :_*), "inner")
      .join(df3, Seq(keys :_*), "inner")
      .withColumn(columns.head + "Out", getChange(df1.col(columns.head), df2.col(columns.head), df3.col(columns.head)))
      .select(col(columns.head + "Out").as(columns.head) +: keys.map(x => df1.col(x)) : _*)

    helper(columns.tail)
  }
}

helper(columns)
}

val list: scala.collection.mutable.ListBuffer[DataFrame] = createFrame(cols, df1, df2, df3)
list.foreach(a => a.show(false))
val keys=cols.slice(0,4)

list.reduce((a,b) =>
  a.alias("a").join(b.alias("b"),Seq(keys :_*),"inner")
  .select("a.*","b." + b.columns.head))
  .orderBy(cols.head)
  .show(false)

这会产生:

+---+---+---+---+----+----+----+
|pk1|pk2|pk3|pk4|val1|val2|val3|
+---+---+---+---+----+----+----+
|1  |aa |c  |d  |ac  |ad  |ae  |
|2  |bb |d  |e  |bb  |bd  |bg  |
+---+---+---+---+----+----+----+

【讨论】:

  • 谢谢,它适用于示例。我不会得到更多的数据框,但列会动态变化。如果我能得到它作为查询,那就太好了。
  • 只是为了澄清数据框中可能会有更多列(或者是第 4 列),或者只是它们的名称发生了变化,还是两者都有?
  • 只有3个文件,但会有更多列。
  • 我现在已经在上面添加了代码来执行此操作。它假设您使用 2 个主键。因此,如果发生变化,您将需要修改代码
  • 我还有一个问题,我们使用的主键只有2个,可能涉及到更多的主键。就像有些文件有 4 个主键,有些有 7 个。我们如何 df1.col(keys(0)) === df2.col(keys(0)) && df1.col(keys(1)) === df2.col(keys(1)) 动态?
【解决方案2】:

我也可以通过将数据框创建为临时视图然后执行 select case 语句来做到这一点。像这样,

df1.createTempView("df1")
df2.createTempView("df2")
df3.createTempView("df3")

select case when df1.val1=df2.val1 and df1.val1<>df3.val1 then df3.val1 end

这要快得多。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-01-23
    • 1970-01-01
    • 2018-04-19
    • 1970-01-01
    • 1970-01-01
    • 2018-10-19
    • 2020-03-18
    • 1970-01-01
    相关资源
    最近更新 更多