【问题标题】:Spark SQL Row_number() PartitionBy Sort DescSpark SQL Row_number() PartitionBy Sort Desc
【发布时间】:2016-05-16 18:56:05
【问题描述】:

我已经在 Spark 中使用 Window 成功创建了一个row_number()partitionBy,但我想通过降序而不是默认的升序对其进行排序。这是我的工作代码:

from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window

data_cooccur.select("driver", "also_item", "unit_count", 
    F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count")).alias("rowNum")).show()

这给了我这个结果:

 +------+---------+----------+------+
 |driver|also_item|unit_count|rowNum|
 +------+---------+----------+------+
 |   s10|      s11|         1|     1|
 |   s10|      s13|         1|     2|
 |   s10|      s17|         1|     3|

在这里我将 desc() 添加到降序排列:

data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count").desc()).alias("rowNum")).show()

并得到这个错误:

AttributeError: 'WindowSpec' 对象没有属性 'desc'

我在这里做错了什么?

【问题讨论】:

  • 在我的 PySpark (2.2.0) 上,我必须使用 row_number 而不是 rowNumber

标签: python apache-spark pyspark apache-spark-sql window-functions


【解决方案1】:

desc 应该应用于列而不是窗口定义。您可以在列上使用任一方法:

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

F.row_number().over(
    Window.partitionBy("driver").orderBy(col("unit_count").desc())
)

或者一个独立的函数:

from pyspark.sql.functions import desc
from pyspark.sql.window import Window

F.row_number().over(
    Window.partitionBy("driver").orderBy(desc("unit_count"))
)

【讨论】:

  • 奇怪的是 pyspark orderBywindow.orderBy 不同,因为一个接受升序而另一个不接受。
【解决方案2】:

或者你可以使用 Spark-SQL 中的 SQL 代码:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('Test')\
    .getOrCreate()

spark.sql("""
    select driver
        ,also_item
        ,unit_count
        ,ROW_NUMBER() OVER (PARTITION BY driver ORDER BY unit_count DESC) AS rowNum
    from data_cooccur
""").show()

【讨论】:

  • 我更喜欢这种方法。为我工作。
【解决方案3】:

更新实际上,我尝试对此进行更多研究,但似乎不起作用。 (实际上它会引发错误)。它不起作用的原因是我在 Databricks 中调用了 display() 下的这段代码(display() 调用之后的代码永远不会运行)。似乎数据帧上的orderBy()window 上的orderBy() 实际上并不相同。我会保持这个答案只是为了否定确认

从 PySpark 2.4 开始(可能更早),只需将关键字 ascending=False 添加到 orderBy 调用中即可。

例如。

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))

似乎给了我同样的行为。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-04-07
    • 2016-03-12
    • 2016-06-08
    • 1970-01-01
    • 1970-01-01
    • 2020-05-26
    • 2015-12-30
    • 1970-01-01
    相关资源
    最近更新 更多