【问题标题】:Compare two data frames with the same schema row by row逐行比较具有相同架构的两个数据框
【发布时间】:2019-01-14 23:54:31
【问题描述】:

我需要逐行比较 spark 数据帧中的两个表(即数据帧),并为特定列获取具有较小值的每一行。例如:

假设我想为每个学生获取每行得分较低的科目,因此我想要这个结果:

我想先用 id 作为连接属性来连接两个数据框,但是我的原始表很大并且有更多属性。似乎这在没有加入的情况下是可行的。我能找到的最接近的问题是this,但我不知道如何将其应用于我的案例。

顺便说一句,带连接的解决方案也很重要,我只是在想是否有更好的解决方案。

【问题讨论】:

  • 使用 join 也可以,但即使使用 join,我仍然不知道如何得到我想要的结果

标签: apache-spark dataframe apache-spark-sql


【解决方案1】:

你不能不加入,除非你能保证每个分区都有相同数量的分区和每个分区中的记录。然后你可以转换为 RDD 和 zip。否则只是join:

import org.apache.spark.sql.functions.{least, struct}

val df1 = Seq(
  (345, "math", 70), (992, "chem", 76), (223, "bio", 80)
).toDF("id", "subject", "score")

val df2 = Seq(
  (345, "psy", 64), (992, "ant", 94), (223, "math",   45)
).toDF("id", "subject", "score")

df1.alias("df1")
  .join(df2.alias("df2"), Seq("id"))
  .select($"id", 
    least(struct($"df1.score", $"df1.subject"),
          struct($"df2.score", $"df2.subject")).alias("score"))
  .select($"id", $"score.subject", $"score.score")

// +---+-------+-----+
// | id|subject|score|
// +---+-------+-----+
// |345|    psy|   64|
// |992|   chem|   76|
// |223|   math|   45|
// +---+-------+-----+

import org.apache.spark.sql.functions.when

df1.alias("df1")
  .join(df2.alias("df2"), Seq("id"))
  .select(
    $"id",
    when($"df1.score" < $"df2.score", $"df1.subject").otherwise($"df2.subject").alias("subject"),
     least($"df1.score", $"df2.score").alias("score"))

// +---+-------+-----+
// | id|subject|score|
// +---+-------+-----+
// |345|    psy|   64|
// |992|   chem|   76|
// |223|   math|   45|
// +---+-------+-----+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-02-19
    • 2020-06-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多