首先,这并不是一个非常“简单”的查询...
首先 - 让我们用一些示例数据创建数据框 - 我创建了只有时间和字符串值的小案例类,您可以用更复杂的类替换它们:
case class A(time: Long, aValue: String)
case class B(time: Long, bValue: String)
val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7"))
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3"))
val dfA: DataFrame = sqlContext.createDataFrame(tableA)
val dfB: DataFrame = sqlContext.createDataFrame(tableB)
现在 - 两个替代方案(概念上相同):
-
使用 SQL:
dfA.registerTempTable("a")
dfB.registerTempTable("b")
sqlContext.sql(
"""
|SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue)
|FROM (
| SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime
| FROM a
| JOIN b ON b.time > a.time
| GROUP BY a.time) AS c
|JOIN b ON c.bTime = b.time
|GROUP BY b.time
""".stripMargin).show()
将为 b(时间和 bValue)的每个值打印时间列表和 a 值列表。
-
使用数据帧:
import org.apache.spark.sql.functions._
val aWithMinB: DataFrame = dfA
.join(dfB, dfA("time") < dfB("time"))
.groupBy(dfA("time"))
.agg(first(dfA("aValue")), min(dfB("time")))
.withColumnRenamed("FIRST(aValue)", "aValue")
.withColumnRenamed("min(time)", "bTime")
aWithMinB
.join(dfB, dfB("time") === aWithMinB("bTime"))
.groupBy(dfB("time"))
.agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue")))
.show()
请注意,两者都只适用于 Spark 1.6.0 或更高版本,因为早期版本中不存在 collect_list。
更新:这里对流程的一些解释:
- 第一个查询(SQL 版本中的inner 查询)旨在为表
a 中的所有记录创建一个“公共值”,应该分组 到一个结果中的单条记录
- 什么是共同价值?
a 中应分组的值是b 中两个连续记录之间的值。因此,它们共享b.time 的相同最小 值,即大于 然后它们的时间。换句话说 - 对于a 中的每次 X,我们都会在 b 中寻找大于 X 的最小时间。这对于所有的人来说都是相同的值 a 中两个连续的 bs 之间的记录
- 为此,我们将
a 与b 与b.time > a.time 条件相结合(对于a 的每条记录,得到b 的许多记录),然后按a.time 分组(缩小结果返回到a 中每条记录一条记录),为每条此类记录取最小 b.time 和每个a 列的first 值(取首先并不重要 - 所有分组记录对于 a 的所有列都具有相同的值!)
- 现在我们已经为
a 中的每条记录提供了这个“额外信息”,我们将它与b 连接到time 列并按该列分组。所有具有相同bTime 的a 记录将连接到相应的b 记录,我们就完成了:我们再次对b 的所有列使用first(同样,所有值都相同对于所有分组记录,因为我们根据b 的唯一标识符进行分组),并在a 的列上使用collect_list 来获取所有值作为列表。