【问题标题】:Spark joins with condition on non join columnSpark 以非连接列上的条件连接
【发布时间】:2019-04-17 18:55:38
【问题描述】:

我有以下两个数据框,我想根据 col A 加入它们

df1:

+------+--------+-------+
|  A   |   B    |   C   |
+------+--------+-------+
| a1   |   5    |   asd |
| a2   |   12   |   asd |
+------+--------+-------+

df2:

+------+--------+-------+
|  A   |   B    |   D   |
+------+--------+-------+
|  a1  |   8    |   qwe |
|  a2  |   10   |   qwe |
+------+--------+-------+

由于 B 列相同,我们假设在两者之间进行选择是有逻辑的,例如选择

+------+--------+------+-----+
|  A   |   B    |  C   |  D  |
+------+--------+------+-----+
|  a1  |   8    |  asd | qwe |
|  a2  |   12   |  asd | qwe |
+------+--------+-------+----+

实现此目的的简单方法是:

val _df1 = df1.withColumnRenamed("B","B_df1")
val _df2 = df2.withColumnRenamed("B", "B_df2)
_df1.join(_df2, Seq("A"))
    .withColumn("B", when(col("B_df1") > col("B_df2"), 
 col("B_df1"))
    .otherwise(col("B_df2"))
    .drop(col("B_df1")
    .drop("B_df2") 

有没有更好的方法可以在不重命名和删除列的情况下实现这一点?

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    这是使用selectExpr 的另一种方法。它节省了一些删除列的努力。

    import spark.implicits._
    
    val df1 = Seq(("a1",5,"asd"),
                  ("a2",12,"asd")
                  ).toDF("A","B","C")
    
    val df2 = Seq(("a1",8,"qwe"),
                  ("a2",10,"qwe")
                  ).toDF("A","B","D")
    
    
    import org.apache.spark.sql.functions.col
    
    df1.as("a").join(df2.as("b"), col("a.A") === col("b.A")).selectExpr("a.A AS A",
                   "CASE WHEN a.B>b.B THEN a.B ELSE b.B END AS B",
                   "a.C AS C",
                   "b.D AS D").show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-07-09
      • 2018-03-10
      • 2019-08-08
      • 2018-06-26
      • 1970-01-01
      • 2021-06-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多