【问题标题】:Find distinct values for each column in an RDD in PySpark在 PySpark 中为 RDD 中的每一列查找不同的值
【发布时间】:2016-08-20 19:02:48
【问题描述】:

我有一个非常长(几十亿行)和相当宽(几百列)的 RDD。我想在每列中创建唯一值集(这些集不需要并行化,因为它们每列包含不超过 500 个唯一值)。

这是我目前所拥有的:

data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))

我在这里所做的是尝试创建一个空集列表,一个用于我的 RDD 中的每一列。对于聚合的第一部分,我想逐行遍历data,将n 列中的值添加到我的集合列表中的nth 集合中。如果该值已经存在,它不会做任何事情。然后,它随后执行集合的union,因此在所有分区中只返回不同的值。

当我尝试运行此代码时,我收到以下错误:

AttributeError: 'list' object has no attribute 'add'

我认为问题在于我没有准确地表明我正在遍历集合列表 (empty_sets),并且我正在遍历 data 中每一行的列。我相信(lambda a, b: a.add(b)) aempty_setsbdata.first() (整行,而不是单个值)。这显然不起作用,也不是我想要的聚合。

如何遍历我的集合列表和数据框的每一行,以将每个值添加到其对应的集合对象中?

所需的输出如下所示:

[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]


P.S 我看过这个例子here,它与我的用例非常相似(这是我首先想到使用aggregate 的地方)。但是,我发现代码很难转换成 PySpark,我很不清楚 casezip 代码在做什么。

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    有两个问题。一,您的组合器函数假定每一行都是一个集合,但您正在对集合列表进行操作。二,add 不返回任何内容(尝试a = set(); b = a.add('1'); print b),因此您的第一个组合器函数返回Nones 的列表。要解决此问题,请将您的第一个组合器函数设为非匿名并让它们都循环集合列表:

    def set_plus_row(sets, row):
        for i in range(len(sets)):
            sets[i].add(row[i])
        return sets
    
    
    unique_values_per_column = data.aggregate(
        empty_sets, 
        set_plus_row, # can't be lambda b/c add doesn't return anything
        lambda x, y: [a.union(b) for a, b in zip(x, y)]
    )
    

    我不确定zip 在 Scala 中做了什么,但在 Python 中,它需要两个列表并将每个相应的元素放在一起到元组中(尝试x = [1, 2, 3]; y = ['a', 'b', 'c']; print zip(x, y);),因此您可以同时循环两个列表。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-01-15
      • 2021-05-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多