【问题标题】:Return RDD of largest N values from another RDD in SPARK从 SPARK 中的另一个 RDD 返回最大 N 值的 RDD
【发布时间】:2016-03-21 11:01:31
【问题描述】:

我正在尝试过滤元组的 RDD,以根据键值返回最大的 N 个元组。我需要返回格式为 RDD。

所以RDD:

[(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')]

过滤最大的 3 个键应该返回 RDD:

[(6,'p'), (12,'e'), (49,'y')]

执行sortByKey() 然后take(N) 返回值并且不会导致RDD,所以这不起作用。

我可以返回所有的键,对它们进行排序,找到第 N 个最大值,然后在 RDD 中过滤大于该值的键值,但这似乎非常低效。

最好的方法是什么?

【问题讨论】:

  • 为什么需要返回一个RDD?您是否采用了不适合单机内存的非常大的数字?如果你不是,你可以并行化返回的集合。
  • 这个例子是从我的实际用例中简化而来的。我的键/值对的值是一个大向量,我想要来自数千个元组的数百个元组的子集。返回数百个这样的向量元组,然后将它们变成另一个 RDD 效率非常低。

标签: python apache-spark pyspark rdd


【解决方案1】:

RDD

一个快速但不是特别有效的解决方案是关注sortByKey 使用zipWithIndexfilter

n = 3
rdd = sc.parallelize([(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')])

rdd.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()

如果 n 与 RDD 大小相比相对较小,则更有效的方法是避免完全排序:

import heapq

def key(kv):
    return kv[0]

top_per_partition = rdd.mapPartitions(lambda iter: heapq.nlargest(n, iter, key))
top_per_partition.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()

如果键远小于值并且最终输出的顺序无关紧要,那么filter 方法可以正常工作:

keys = rdd.keys()
identity = lambda x: x

offset = (keys
    .mapPartitions(lambda iter: heapq.nlargest(n, iter))
    .sortBy(identity)
    .zipWithIndex()
    .filter(lambda xi: xi[1] < n)
    .keys()
    .max())

rdd.filter(lambda kv: kv[0] <= offset)

如果出现平局,它也不会保留精确的 n 值。

DataFrames

你可以orderBylimit

from pyspark.sql.functions import col

rdd.toDF().orderBy(col("_1").desc()).limit(n)

【讨论】:

  • 我是 Spark 的新手,请原谅我的无知。我阅读了 zipWithIndex() 的文档,听起来它创建的索引不会在所有分区中排序。 “排序首先基于分区索引,然后是每个分区内项目的排序。”那么它会按照 sortByKey 的顺序对所有分区中的所有内容进行索引,还是仅在每个分区内排序,然后添加分区索引?
  • 刚刚完成验证此解决方案。这看起来不错的样子。还有其他更有效的方法吗?
  • 并非如此。根据您的评论,您可以尝试仅对键进行排序,然后按照您在问题中提到的进行过滤。它会限制洗牌,但需要权衡取舍。
【解决方案2】:

一种省力的方法,因为您只想将take(N) 结果转换为新的 RDD。

sc.parallelize(yourSortedRdd.take(Nth))

【讨论】:

  • 这会在重新平行化之前将您的 N 值加载到火花驱动器上,如果您的 N 很大,这可能会失败。
猜你喜欢
  • 2016-12-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-19
  • 1970-01-01
  • 2020-03-09
  • 2016-08-29
相关资源
最近更新 更多