【问题标题】:In Spark 1.6 / Scala, getting column value correlated with aggregate在 Spark 1.6 / Scala 中,获取与聚合相关的列值
【发布时间】:2017-05-19 22:51:59
【问题描述】:

假设我有一个包含三列的 DataFrame:

itemid, date, price
1, 2017-05-18, $1.10
2, 2017-05-18, $2.20
1, 2017-04-12, $0.90
1, 2017-03-29, $1.00

现在,我想按 itemid 分组,获取最早日期,并获取与最早日期匹配的价格。 (我们可以假设 (itemid, date) 是唯一的。)

上面输入的输出是:

1, 2017-03-29, $1.00
2, 2017-05-18, $2.20

在 SQL 中,我可以使用自联接来做到这一点——首先为每个 itemid 选择最短日期,然后选择价格和日期与该最短日期匹配的日期。

如何在 Scala Spark DataFrames 中表达这一点? 如果答案仍然涉及自连接,那么 Spark 1.6 中的 DataFrame 查询执行器是否足够智能,不会实际实现连接?

【问题讨论】:

    标签: scala apache-spark dataframe spark-dataframe


    【解决方案1】:

    一种方法是使用类似于以下的 SparkSQL 窗口函数:

    import org.apache.spark.sql.expressions.Window
    
    val df = Seq(
        (1, "2017-05-18", 1.10),
        (2, "2017-05-18", 2.20),
        (1, "2017-04-12", 0.90),
        (1, "2017-03-29", 1.00)
      ).toDF(
        "itemid", "date", "price"
      ).as[(Integer, String, Double)]
    
    // Add earliest date by itemid via window function and
    // keep only rows with earliest date by itemid
    val df2 = df.withColumn("earliestDate", min("date").over(
        Window.partitionBy("itemid")
      )).
      where($"date" === $"earliestDate")
    
    df2.show
    +------+----------+-----+------------+
    |itemid|      date|price|earliestDate|
    +------+----------+-----+------------+
    |     1|2017-03-29|  1.0|  2017-03-29|
    |     2|2017-05-18|  2.2|  2017-05-18|
    +------+----------+-----+------------+
    

    【讨论】:

    • 感谢您的解决方案。事实证明,这有点类似于自连接方法——因为 (itemid, date) 已经是保证唯一键,我可以使用标准聚合计算 min-price-for-id-by-date,然后重新加入。也就是说,我可以使用唯一键而不是使用行 id,而不是使用窗口,我可以只使用 groupBy()。
    • @Jon Watte,是的,使用 groupBy 和 self-join 并且 (itemid, date) 是唯一的,不需要创建唯一的列。事实上,在这种简单的情况下,如果使用窗口函数,则不需要自连接(因此当然不需要创建唯一的 rowid)。我已经更新了我的答案。
    猜你喜欢
    • 1970-01-01
    • 2021-04-07
    • 1970-01-01
    • 2018-02-21
    • 2018-03-02
    • 2017-03-10
    • 2021-04-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多