【问题标题】:(PySpark) Nested lists after reduceByKey(PySpark) reduceByKey 之后的嵌套列表
【发布时间】:2014-01-31 08:40:33
【问题描述】:

我确定这很简单,但我没有找到与此相关的任何内容。

我的代码很简单:

... 
stream = stream.map(mapper) 
stream = stream.reduceByKey(reducer) 
... 

没什么特别的。输出看起来像这样:

... 
key1  value1 
key2  [value2, value3] 
key3  [[value4, value5], value6] 
... 

等等。所以,有时我得到一个固定的值(如果它是单一的)。有时 - 嵌套列表可能非常非常深(在我的简单测试数据中它是 3 级深)。

我尝试通过来源搜索“flat”之类的东西 - 但只发现 flatMap 方法(据我了解)不是我需要的。

我不知道为什么这些列表是嵌套的。我的猜测是它们是由不同的流程(工人?)处理的,然后在没有展平的情况下连接在一起。

当然,我可以用 Python 编写代码来展开该列表并将其展平。但我相信这不是一个正常的情况——我认为几乎每个人都需要一个平坦的输出。

itertools.chain 在找到不可迭代的值时停止展开。换句话说,它仍然需要一些编码(上一段)。

那么 - 如何使用 PySpark 的本机方法展平列表?

谢谢

【问题讨论】:

  • 你的reduce函数是什么(reducer)?
  • @JoshRosen 只需“返回 [key, value]”

标签: python apache-spark


【解决方案1】:

这里的问题是你的 reduce 函数。对于每个键,reduceByKey 使用值对调用您的 reduce 函数,并期望它生成相同类型的组合值。

例如,假设我想执行字数统计操作。首先,我可以将每个单词映射到(word, 1) 对,然后我可以reduceByKey(lambda x, y: x + y) 来总结每个单词的计数。最后,我得到了 (word, count) 对的 RDD。

这是来自PySpark API Documentation 的示例:

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

要了解您的示例为什么不起作用,您可以想象一下这样应用 reduce 函数:

reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...

根据您的 reduce 函数,听起来您可能正在尝试实现内置的 groupByKey 操作,该操作将每个键与其值列表分组。

另外,看看combineByKey,它是reduceByKey() 的泛化,它允许reduce 函数的输入和输出类型不同(reduceByKeyimplementedcombineByKey 方面)

【讨论】:

  • 哎呀...我必须说,Spark 的方法与许多 MR 框架不同。在那里移植一些工作的 MRJob 或 Disco 代码需要一些时间。
【解决方案2】:

或者,stream.groupByKey().mapValues(lambda x: list(x)).collect() 给出

key1 [value1]
key2 [value2, value3]
key3 [value4, value5, value6]

【讨论】:

  • 或者只是.groupByKey().mapValues(list)
  • .reduceByKey(lambda a,b: (a if type(a) == list else [a]) + (b if type(b) == list else [b])).collect()
猜你喜欢
  • 1970-01-01
  • 2016-12-27
  • 2018-05-07
  • 1970-01-01
  • 2020-03-14
  • 2015-10-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多