【问题标题】:convert this sql left-join query to spark dataframes (scala)将此 sql 左连接查询转换为 spark 数据帧(scala)
【发布时间】:2021-12-12 07:10:18
【问题描述】:

我有这个 sql 查询,它是一个左连接,并且在开头有一个 select 语句,它也从右表列中进行选择。 您能否帮助将其转换为 spark 数据帧并使用 spark-shell 获得结果? 我不想在 spark 中使用 sql 代码,而是想使用数据帧。

我知道 scala 中的连接语法,但是当左连接的结果 df 无法访问右表时,我不知道如何从右表中进行选择(这里是 count(w.id2))列。

谢谢!

select count(x.user_id) user_id_count, count(w.id2) current_id2_count
from
    (select
        user_id
    from
        tb1
    where
        year='2021'
        and month=1
        
    ) x
left join
    (select id1, max(id2) id2 from tb2 group by id1) w
on
    x.user_id=w.id1;

在 spark 中,我将创建两个数据帧 x 和 w 并将它们连接起来:

var x = spark.sqlContext.table("tb1").where("year='2021' and month=1")
var w= spark.sqlContext.table("tb2").groupBy("id1").agg(max("id2")).alias("id2"
var joined = x.join(w, x("user_id")===w("id1"), "left")

编辑: 我对左连接感到困惑。火花出现了一些错误,即列 id2 不可用,我认为这是因为左连接生成的 df 将只有左表的列。然而原因是当我选择 max(id2) 时,我必须正确地给它一个别名。

这是一个示例和解决方案:

var x = Seq("1","2","3","4").toDF("user_id")

var w = Seq (("1", 1), ("1",2), ("3",10),("1",5),("5",4)).toDF("id1", "id2")

var z= w.groupBy("id1").agg(max("id2").alias("id2"))

val xJoinsZ= x.join(z, x("user_id") === z("id1"), "left").select(count(col("user_id").alias("user_id_count")), count(col("id2").alias("current_id2_count")))
scala> x.show(false)
+-------+
|user_id|
+-------+
|1      |
|2      |
|3      |
|4      |
+-------+
scala> z.show(false)
+---+---+                                                                       
|id1|id2|
+---+---+
|3  |10 |
|5  |4  |
|1  |5  |
+---+---+


scala> xJoinsZ.show(false)
+---------------------------------+---------------------------------+
|count(user_id AS `user_id_count`)|count(id2 AS `current_id2_count`)|
+---------------------------------+---------------------------------+
|4                                |2                                |
+---------------------------------+---------------------------------+

【问题讨论】:

  • 我不确定我是否理解您的问题。你想做什么,你面临的问题是什么?
  • 嗨 Oli,我正在尝试使用 spark-shell 中的数据框运行该 sql 查询
  • 对。我的意思是你写了 Scala 代码。它出什么问题了?有什么你做不到的吗?什么阻碍了你?
  • 我想做类似于sql命令的select和join
  • “左连接无权访问右表的列”是什么意思?你到底在哪里卡住你的编码? minimal reproducible example 使用足够多的单词、句子和对部分示例的引用来清楚完整地表达你的意思。请(始终)通过编辑而不是 cmets 进行澄清。

标签: sql scala apache-spark join left-join


【解决方案1】:

您的请求很难理解,但是我会尝试以您提供的 SQL 代码作为基线并使用 Spark 重现它。

// Reading tb1 (x) and filtering for Jan 2021, selecting only "user_id"
val x: DataFrame = spark.read
 .table("tb1")
 .filter(col("year") === "2021")
 .filter(col("mont") === "01")
 .select("user_id")

// Reading tb2 (w) and for each "id1" getting the max "id2"
val w: DataFrame = spark.read
 .table("tb2")
 .groupBy(col("id1"))
 .max("id2")

// Joining tb1 (x) and tb2 (w) on "user_id" === "id1", then counting user_id and id2
val xJoinsW: DataFrame = x
 .join(w, x("user_id") === w("id1"), "left")
 .select(count(col("user_id").as("user_id_count")), count(col("max(id2)").as("current_id2_count")))

一个小而相关的评论,当您使用 Scala 和 Spark 时,我建议您使用 val 而不是 varval 表示它是最终的,不能重新分配,而 var 可以稍后重新分配。你可以阅读更多here

最后,您可以随意更改 Spark 阅读机制。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-27
    • 1970-01-01
    • 1970-01-01
    • 2018-12-05
    相关资源
    最近更新 更多