【问题标题】:PySpark reduceByKey by only one keyPySpark reduceByKey 只有一个键
【发布时间】:2018-08-23 14:04:30
【问题描述】:

我有这样的rdd

// Structure List[Tuple(x1, x2, value), Tuple(x1, x2, value)]
data = [('23', '98', 34), ('23', '89', 39), ('23', '12', 30), ('24', '12', 34), ('24', '14', 37), ('24', '16', 30)]

我正在寻找最终结果是 x1 的得分最大值以及与之关联的 x2 值。像这样

data = [('23', '89', 39), ('24', '14', 37)]

我尝试了reduceByKey,但它给了我每个组合的最大值,这不是我想要的。

来自comment

这是我尝试过的:

max_by_group = (
    data.map(lambda x: (x[0], x))
        .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) 
        .values()
)

【问题讨论】:

  • 这就是我所做的 max_by_group = ( data.map(lambda x: (x[0], x)) .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x : x[-1])) .values() )

标签: python python-3.x apache-spark pyspark


【解决方案1】:

groupBy第一个元素,然后通过元组中的第三个元素找到每个组的最大值:

(rdd.groupBy(lambda x: x[0])
    .mapValues(lambda x: max(x, key=lambda y: y[2]))
    .values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]

或使用reduceByKey:

(rdd.map(lambda x: (x[0], x))
    .reduceByKey(lambda x, y: x if x[2] > y[2] else y)
    .values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]

【讨论】:

  • 非常感谢。我尝试了这两种方法。自从有近十亿条记录以来,这需要一段时间。但令我惊讶的是,reduceByKey 比 groupBy 快,不知道为什么。您的 reduceByKey 解决方案也与我的几乎相同:)
  • reduceByKey在适用时比groupBy效率更高,因为reduceByKey经过优化,它在洗牌之前为每个分区组合数据;按分区聚合之后的改组可以最大限度地减少跨集群的数据传输,这通常是昂贵的。
  • 嗯,我不知道。非常感谢您分享此信息。
【解决方案2】:

如果您使用rdds,@Psidom 的答案就是您要寻找的答案。另一种选择是convert your rdd to a DataFrame

rdd = sc.parallelize(data)
df = rdd.toDF(["x1", "x2", "value"])
df.show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 98|   34|
#| 23| 89|   39|
#| 23| 12|   30|
#| 24| 12|   34|
#| 24| 14|   37|
#| 24| 16|   30|
#+---+---+-----+

现在你可以group by x1 and filter the rows with the maximum value:

import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('x1')
df.withColumn('maxValue', f.max('value').over(w))\
    .where(f.col('value') == f.col('maxValue'))\
    .drop('maxValue')\
    .show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 89|   39|
#| 24| 14|   37|
#+---+---+-----+

【讨论】:

    【解决方案3】:

    从 itertools 导入 groupby:

    [max(list(j),key=lambda x:x[2]) for i,j in groupby(data,key = lambda x:x[0])]
    
    Out[335]: [('23', '89', 39), ('24', '14', 37)]
    

    【讨论】:

    • 这适用于 python,但 OP 正在询问 spark rdds。
    猜你喜欢
    • 2015-07-02
    • 2015-12-09
    • 2015-10-17
    • 2021-10-26
    • 1970-01-01
    • 1970-01-01
    • 2018-08-07
    • 2017-05-17
    • 2015-10-02
    相关资源
    最近更新 更多