Map 和 ReduceByKey
reduce 的输入类型和输出类型必须相同,因此如果要聚合一个列表,则必须将map 输入到列表中。然后将这些列表合并为一个列表。
组合列表
您需要一种将列表合并为一个列表的方法。 Python 提供了一些methods to combine lists。
append 修改第一个列表,将始终返回None。
x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]
extend 做同样的事情,但解包列表:
x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]
这两种方法都返回None,但您需要一个返回组合列表的方法,因此只需use the plus sign。
x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]
火花
file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda actor: (actor.split(",")[0], actor)) \
# transform each value into a list
.map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \
# combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
.reduceByKey(lambda a, b: a + b)
组合键
也可以用combineByKey来解决这个问题,它在内部用于实现reduceByKey,但它更复杂,"using one of the specialized per-key combiners in Spark can be much faster"。对于上面的解决方案,您的用例已经足够简单了。
GroupByKey
也可以使用groupByKey、but it reduces parallelization 解决此问题,因此对于大数据集可能会慢得多。