【问题标题】:Spark groupby, sort values, then take first and lastSpark groupby,对值进行排序,然后取第一个和最后一个
【发布时间】:2019-03-24 18:20:39
【问题描述】:

我正在使用 Apache Spark,并且有一个如下所示的数据框:

scala> df.printSchema
root
 |-- id: string (nullable = true)
 |-- epoch: long (nullable = true)


scala> df.show(10)
+--------------------+-------------+
|                 id |        epoch|
+--------------------+-------------+
|6825a28d-abe5-4b9...|1533926790847|
|6825a28d-abe5-4b9...|1533926790847|
|6825a28d-abe5-4b9...|1533180241049|
|6825a28d-abe5-4b9...|1533926790847|
|6825a28d-abe5-4b9...|1532977853736|
|6825a28d-abe5-4b9...|1532531733106|
|1eb5f3a4-a68c-4af...|1535383198000|
|1eb5f3a4-a68c-4af...|1535129922000|
|1eb5f3a4-a68c-4af...|1534876240000|
|1eb5f3a4-a68c-4af...|1533840537000|
+--------------------+-------------+
only showing top 10 rows

我想按id 字段进行分组,以获得id 的所有纪元时间戳。然后我想通过升序时间戳对纪元进行排序,然后取第一个和最后一个纪元。

我使用了以下查询,但 firstlast 纪元值似乎是按照它们在原始数据框中出现的顺序进行的。我希望按升序排列第一个和最后一个。

scala> val df2 = df2.groupBy("id").
                 agg(first("epoch").as("first"), last("epoch").as("last"))

scala> df2.show()
+--------------------+-------------+-------------+                              
|                  id|        first|         last|
+--------------------+-------------+-------------+
|4f433f46-37e8-412...|1535342400000|1531281600000|
|d0cba2f9-cc04-42c...|1535537741000|1530448494000|
|6825a28d-abe5-4b9...|1533926790847|1532531733106|
|e963f265-809c-425...|1534996800000|1534996800000|
|1eb5f3a4-a68c-4af...|1535383198000|1530985221000|
|2e65a033-85ed-4e4...|1535660873000|1530494913413|
|90b94bb0-740c-42c...|1533960000000|1531108800000|
+--------------------+-------------+-------------+

如何从 epoch 升序排序的 epoch 列表中检索第一个和最后一个?

【问题讨论】:

  • 使用minmax?
  • 我稍后将使用字符串值,而不仅仅是数字时期。 min 和 max 是否也适用于字符串?
  • 我想是的。否则你需要窗口函数。
  • 带窗口的解决方案stackoverflow.com/a/45210121/1465609

标签: apache-spark apache-spark-sql


【解决方案1】:

您可以只使用 min 和 max 并将结果列转换为字符串。这是一种方法

   import org.apache.spark.sql.functions._
val df = Seq(("6825a28d-abe5-4b9",1533926790847.0),
("6825a28d-abe5-4b9",1533926790847.0),
("6825a28d-abe5-4b9",1533180241049.0),
("6825a28d-abe5-4b9",1533926790847.0),
("6825a28d-abe5-4b9",1532977853736.0),
("6825a28d-abe5-4b9",1532531733106.0),
("1eb5f3a4-a68c-4af",1535383198000.0),
("1eb5f3a4-a68c-4af",1535129922000.0),
("1eb5f3a4-a68c-4af",1534876240000.0),
("1eb5f3a4-a68c-4af",1533840537000.0)).toDF("id","epoch").withColumn("epoch",($"epoch"/1000.0).cast("timestamp"))

    +-----------------+--------------------+
|               id|               epoch|
+-----------------+--------------------+
|6825a28d-abe5-4b9|2018-08-10 18:46:...|
|6825a28d-abe5-4b9|2018-08-10 18:46:...|
|6825a28d-abe5-4b9|2018-08-02 03:24:...|
|6825a28d-abe5-4b9|2018-08-10 18:46:...|
|6825a28d-abe5-4b9|2018-07-30 19:10:...|
|6825a28d-abe5-4b9|2018-07-25 15:15:...|
|1eb5f3a4-a68c-4af| 2018-08-27 15:19:58|
|1eb5f3a4-a68c-4af| 2018-08-24 16:58:42|
|1eb5f3a4-a68c-4af| 2018-08-21 18:30:40|
|1eb5f3a4-a68c-4af| 2018-08-09 18:48:57|
+-----------------+--------------------+

    val df1 = df.groupBy("id").agg(min($"epoch").cast("string").as("first"), max($"epoch").cast("string"). as("last"))
df1.show

    +-----------------+--------------------+--------------------+
|               id|               first|                last|
+-----------------+--------------------+--------------------+
|6825a28d-abe5-4b9|2018-07-25 15:15:...|2018-08-10 18:46:...|
|1eb5f3a4-a68c-4af| 2018-08-09 18:48:57| 2018-08-27 15:19:58|
+-----------------+--------------------+--------------------+


    df1: org.apache.spark.sql.DataFrame = [id: string, first: string ... 1 more field]

【讨论】:

    【解决方案2】:

    firstlast 函数在 Window 上下文之外应用时毫无意义。取值完全是任意的。

    相反,你应该

    • 如果逻辑符合基本排序规则(字符串、数组和结构的字母数字,数字的数字),请使用 min / max 函数。

    • 强类型数据集,map -> groupByKey -> reduceGroupsgroupByKey -> mapGroups 否则。

    【讨论】:

      猜你喜欢
      • 2016-12-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-07
      • 1970-01-01
      • 2012-10-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多