【发布时间】:2022-01-09 16:27:08
【问题描述】:
我有一个一般的 SQL 查询问题,我认为可以用各种 SQL 风格来回答,尽管我下面的示例使用的是 spark sql。
我正在尝试将表 1 (t1) 左连接到表 2 (t2),目标如下:
- 保留
t1中的所有值(因此左连接) - 根据
t2中的聚合函数选择t2中的列
以下是一些测试数据:
t1
+---+--------+
|pk1|constant|
+---+--------+
| a|constant|
| b|constant|
| c|constant|
| d|constant|
+---+--------+
t2
+---+---------+------+
|fk1|condition|target|
+---+---------+------+
| a| 1|check1|
| a| 2|check2|
| b| 1|check1|
| b| 2|check2|
+---+---------+------+
这里有几个(失败的)示例查询:
spark.sql("""
select
pk1,
constant,
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
having
min(condition)
""").show
+---+--------+------+
|pk1|constant|target|
+---+--------+------+
| b|constant|check1|
| a|constant|check2|
| a|constant|check1|
| b|constant|check2|
+---+--------+------+
查询 1 的问题:我丢失了 pk1 在 'c','d' 中的 t1 行。对我来说,它看起来像是内连接,而不是左连接。
spark.sql("""
select
pk1,
constant,
min(condition),
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
""").show
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
| a|constant| 1|check1|
| a|constant| 2|check2|
| b|constant| 2|check2|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
+---+--------+--------------+------+
查询 2 的问题:我不再对条件的最小值进行过滤。比如pk1 = a,我取了condition = 1和condition = 2。 min 函数似乎没有应用。
期望的输出
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
| a|constant| 1|check1|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
+---+--------+--------------+------+
min(condition) 列是可选的。反正我以后会过滤掉的。
我可以通过将查询分成两个语句来创建所需的输出,但我觉得这里必须有一个优雅的单查询解决方案。有人知道如何做到这一点吗?谢谢!
附录
以下是构建测试表的命令,以防有人想复制测试:
val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)
【问题讨论】:
标签: sql apache-spark apache-spark-sql left-join