【发布时间】:2016-08-20 19:02:48
【问题描述】:
我有一个非常长(几十亿行)和相当宽(几百列)的 RDD。我想在每列中创建唯一值集(这些集不需要并行化,因为它们每列包含不超过 500 个唯一值)。
这是我目前所拥有的:
data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))
我在这里所做的是尝试创建一个空集列表,一个用于我的 RDD 中的每一列。对于聚合的第一部分,我想逐行遍历data,将n 列中的值添加到我的集合列表中的nth 集合中。如果该值已经存在,它不会做任何事情。然后,它随后执行集合的union,因此在所有分区中只返回不同的值。
当我尝试运行此代码时,我收到以下错误:
AttributeError: 'list' object has no attribute 'add'
我认为问题在于我没有准确地表明我正在遍历集合列表 (empty_sets),并且我正在遍历 data 中每一行的列。我相信(lambda a, b: a.add(b)) a 是 empty_sets 和 b 是 data.first() (整行,而不是单个值)。这显然不起作用,也不是我想要的聚合。
如何遍历我的集合列表和数据框的每一行,以将每个值添加到其对应的集合对象中?
所需的输出如下所示:
[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]
P.S 我看过这个例子here,它与我的用例非常相似(这是我首先想到使用aggregate 的地方)。但是,我发现代码很难转换成 PySpark,我很不清楚 case 和 zip 代码在做什么。
【问题讨论】:
标签: apache-spark pyspark