【问题标题】:Pyspark SQL query to get rows that are +/- 20% of a specific columnPyspark SQL 查询以获取特定列的 +/- 20% 的行
【发布时间】:2019-07-29 16:43:53
【问题描述】:

我有以下 pyspark df:

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137|   16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202|  481045| 241788|
|201522369349300207|  700861|1185640|
|201522369349300227|  178479| 267976|
+------------------+--------+-------+

对于每一行,我希望能够获得资产数量在 20% 以内的行。例如,对于第一行 (ID=201542399349300619),我希望能够获取资产在 1,633,944 的 20% +/- 范围内的所有行(因此在 1,307,155 到 1,960,732 之间):

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201522369349300122| 1401406|1010828|

使用这个子集表,我想获取平均资产并将其添加为新列。所以对于上面的例子,平均资产为 (1633944+1401406) = 1517675

+------------------+--------+-------+---------+
|                ID|  Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944|  32850|  1517675|

【问题讨论】:

  • @pault 我的错,我已经纠正了错别字。我对 SQL 不是很熟悉,因此我只想获得一些指导,了解如何利用现有的技术实现这一目标。

标签: python sql pyspark apache-spark-sql pyspark-sql


【解决方案1】:

假设您的 DataFrame 具有类似于以下的架构(即 AssetsRevenue 是数字):

df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)

在您提出的条件下,您可以 join 将 DataFrame 设置为自身。加入后,可以通过取Assets列的平均值进行分组聚合。

例如:

from pyspark.sql.functions import avg, expr

df.alias("l")\
    .join(
        df.alias("r"), 
        on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
    )\
    .groupBy("l.ID", "l.Assets", "l.Revenue")\
    .agg(avg("r.Assets").alias("AvgAssets"))\
    .show()
#+------------------+--------+-------+------------------+
#|                ID|  Assets|Revenue|         AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914|         3691223.5|
#|201522369349300202|  481045| 241788|          481045.0|
#|201522369349300207|  700861|1185640|          700861.0|
#|201522369349300137|   16948| 171534|           16948.0|
#|201522369349300142|13474056|2285323|       1.3474056E7|
#|201522369349300227|  178479| 267976|          178479.0|
#|201542399349300619| 1633944|  32850|         1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553|         1138291.0|
#|201542399349300634| 3402687|1983568|         3691223.5|
#+------------------+--------+-------+------------------+

由于我们将 DataFrame 连接到自身,我们可以使用别名来引用左表 ("l") 和右表 ("r")。上面的逻辑是在r 中的资产是l 中资产的+/20% 的条件下将l 加入r

有多种表达 +/20% 条件的方法,但我使用 spark-sql between 表达式来查找介于 Assets * 0.8Assets * 1.2 之间的行。

然后我们对左表的所有列 (groupBy) 进行聚合,并对右表中的资产进行平均。

生成的AvgAssets 列是FloatType 列,但如果您愿意,可以通过在.alias("AvgAssets") 之前添加.cast("int") 轻松地将其转换为IntegerType


另见:

【讨论】:

    猜你喜欢
    • 2013-06-10
    • 2022-01-12
    • 2020-09-14
    • 1970-01-01
    • 2018-08-27
    • 1970-01-01
    • 2014-01-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多