【问题标题】:SQL Simple Join QuerySQL 简单连接查询
【发布时间】:2016-03-30 20:27:49
【问题描述】:

我首先说我是 SQL 新手,所以这个问题可能是微不足道的。 我有两个带有时间戳键的表。 对于table 1 中的每个事件t_i,我希望table 2 中的所有事件q 这样:

q.timeStamp < t_i.timeStamp and q.timeStamp > t_{i-1}.timeStamp

也就是说,如果事件按照时间戳顺序发生:

    q1
t1  q2
    q3
    q4
t2  q5
    q6
t3  q7

那么结果查询应该是:

t1: q1
t2: q2 q3 q4
t3: q5 q6

我将 Scala 与带有 DataSet 和 DataFrame 类的 SQL Spark 一起使用,因此纯函数式“groupBy”或 SQL 查询都很好。

【问题讨论】:

  • 我们需要样本数据 - 向我们展示您的基础数据集的外观(以我们可以将粘贴复制到我们自己的 shell 的方式)。否则我们不知道如何正确转换您的数据!
  • 我没有报告数据,因为我将它包装在多个案例类中以便于操作。所以我会粘贴原始数据
  • @KatyaHandler 我刚刚添加了数据的快照。在原始数据集中,DATE 字段也会发生变化,应在查询中考虑

标签: sql scala join apache-spark apache-spark-sql


【解决方案1】:

首先,这并不是一个非常“简单”的查询...

首先 - 让我们用一些示例数据创建数据框 - 我创建了只有时间和字符串值的小案例类,您可以用更复杂的类替换它们:

case class A(time: Long, aValue: String)
case class B(time: Long, bValue: String)

val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7"))
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3"))

val dfA: DataFrame = sqlContext.createDataFrame(tableA)
val dfB: DataFrame = sqlContext.createDataFrame(tableB)

现在 - 两个替代方案(概念上相同):

  1. 使用 SQL

    dfA.registerTempTable("a")
    dfB.registerTempTable("b")
    
    sqlContext.sql(
      """
        |SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue)
        |FROM (
        |  SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime
        |  FROM a
        |  JOIN b ON b.time > a.time
        |  GROUP BY a.time) AS c
        |JOIN b ON c.bTime = b.time
        |GROUP BY b.time
      """.stripMargin).show()
    

    将为 b(时间和 bValue)的每个值打印时间列表和 a 值列表。

  2. 使用数据帧

    import org.apache.spark.sql.functions._
    
    val aWithMinB: DataFrame = dfA
      .join(dfB, dfA("time") < dfB("time"))
      .groupBy(dfA("time"))
      .agg(first(dfA("aValue")), min(dfB("time")))
      .withColumnRenamed("FIRST(aValue)", "aValue")
      .withColumnRenamed("min(time)", "bTime")
    
    aWithMinB
      .join(dfB, dfB("time") === aWithMinB("bTime"))
      .groupBy(dfB("time"))
      .agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue")))
      .show()
    

请注意,两者都只适用于 Spark 1.6.0 或更高版本,因为早期版本中不存在 collect_list

更新:这里对流程的一些解释:

  • 第一个查询(SQL 版本中的inner 查询)旨在为表a 中的所有记录创建一个“公共值”,应该分组 到一个结果中的单条记录
  • 什么是共同价值? a 中应分组的值是b 中两个连续记录之间的值。因此,它们共享b.time 的相同最小 值,即大于 然后它们的时间。换句话说 - 对于a 中的每次 X,我们都会在 b 中寻找大于 X 的最小时间。这对于所有的人来说都是相同的值 a 中两个连续的 bs 之间的记录
  • 为此,我们将abb.time &gt; a.time 条件相结合(对于a 的每条记录,得到b 的许多记录),然后按a.time 分组(缩小结果返回到a 中每条记录一条记录),为每条此类记录取最小 b.time 和每个a 列的first 值(取首先并不重要 - 所有分组记录对于 a 的所有列都具有相同的值!)
  • 现在我们已经为a 中的每条记录提供了这个“额外信息”,我们将它与b 连接到time 列并按该列分组。所有具有相同bTimea 记录将连接到相应的b 记录,我们就完成了:我们再次对b 的所有列使用first(同样,所有值都相同对于所有分组记录,因为我们根据b 的唯一标识符进行分组),并在a 的列上使用collect_list 来获取所有值作为列表。

【讨论】:

  • 这可行,但我不清楚其背后的原因,因为我以前没有使用 SQL 的经验,你能向我解释一下吗?
  • 添加了详细说明,希望对您有所帮助。如果没有,对不起:( 拿个SQL教程来理解join、where、group...的逻辑含义...
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-07-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多