你不能不加入,除非你能保证每个分区都有相同数量的分区和每个分区中的记录。然后你可以转换为 RDD 和 zip。否则只是join:
import org.apache.spark.sql.functions.{least, struct}
val df1 = Seq(
(345, "math", 70), (992, "chem", 76), (223, "bio", 80)
).toDF("id", "subject", "score")
val df2 = Seq(
(345, "psy", 64), (992, "ant", 94), (223, "math", 45)
).toDF("id", "subject", "score")
df1.alias("df1")
.join(df2.alias("df2"), Seq("id"))
.select($"id",
least(struct($"df1.score", $"df1.subject"),
struct($"df2.score", $"df2.subject")).alias("score"))
.select($"id", $"score.subject", $"score.score")
// +---+-------+-----+
// | id|subject|score|
// +---+-------+-----+
// |345| psy| 64|
// |992| chem| 76|
// |223| math| 45|
// +---+-------+-----+
或
import org.apache.spark.sql.functions.when
df1.alias("df1")
.join(df2.alias("df2"), Seq("id"))
.select(
$"id",
when($"df1.score" < $"df2.score", $"df1.subject").otherwise($"df2.subject").alias("subject"),
least($"df1.score", $"df2.score").alias("score"))
// +---+-------+-----+
// | id|subject|score|
// +---+-------+-----+
// |345| psy| 64|
// |992| chem| 76|
// |223| math| 45|
// +---+-------+-----+