【问题标题】:Apache spark and python lambdaApache spark 和 python lambda
【发布时间】:2014-08-25 20:12:25
【问题描述】:

我有以下代码

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

http://spark.apache.org/examples.html我已经从这里复制了示例

我无法理解这段代码,尤其是关键字

  1. 平面图,
  2. 地图和
  3. 减少

有人可以用简单的英语解释发生了什么。

【问题讨论】:

  • 我不是专家,但我认为 flatMap 从嵌套结构(单词行列表?)构建列表,map 将该函数应用于所有元素,reduceByKey 通过键对元素进行分组(这里相同的话,我猜)并成对应用函数(这里是一个总和)。这可能会计算文本中每个单词的出现次数。
  • 如果您使用函数式语言进行函数式编程,事情会变得更加简洁和易于阅读。 IE。我强烈建议使用 Scala 而不是 OO 脚本语言。 Scala 更强大,对 Spark 的性能略高一些,并且更容易挖掘 Spark 代码。你的代码就变成了:spark.textFile("hdfs://...").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile("hdfs://...")

标签: python apache-spark


【解决方案1】:

map 是最简单的,它本质上说对序列的每个元素执行给定的操作并返回结果序列(非常类似于 foreach)。 flatMap 是一样的,但不是每个元素只返回一个元素,而是允许返回一个序列(可以为空)。这是解释difference between map and flatMap 的答案。最后reduceByKey 接受一个聚合函数(意味着它接受两个相同类型的参数并返回该类型,也应该是可交换的和关联的,否则你会得到不一致的结果),它用于聚合每个V 的每个K在您的(K,V) 对序列中。

示例*:
reduce (lambda a, b: a + b,[1,2,3,4])

这表示用+ 聚合整个列表,这样就可以了

1 + 2 = 3  
3 + 3 = 6  
6 + 4 = 10  
final result is 10

按键归约是一样的,只是你对每个唯一键进行归约。


所以在你的例子中解释它

file = spark.textFile("hdfs://...") // open text file each element of the RDD is one line of the file
counts = file.flatMap(lambda line: line.split(" ")) //flatMap is needed here to return every word (separated by a space) in the line as an Array
             .map(lambda word: (word, 1)) //map each word to a value of 1 so they can be summed
             .reduceByKey(lambda a, b: a + b) // get an RDD of the count of every unique word by aggregating (adding up) all the 1's you wrote in the last step
counts.saveAsTextFile("hdfs://...") //Save the file onto HDFS

那么,为什么要以这种方式计算字数,原因是 MapReduce 编程范式是高度可并行化的,因此可以扩展到对 TB 甚至 PB 的数据进行计算。


我不太会用python,如果我犯了错误请告诉我。

【讨论】:

    【解决方案2】:

    查看内联-cmets:

    file = spark.textFile("hdfs://...") # opens a file
    counts = file.flatMap(lambda line: line.split(" ")) \  # iterate over the lines, split each line by space (into words)
                 .map(lambda word: (word, 1)) \ # for each word, create the tuple (word, 1)
                 .reduceByKey(lambda a, b: a + b) # go over the tuples "by key" (first element) and sum the second elements
    counts.saveAsTextFile("hdfs://...")
    

    reduceByKey更详细的解释可以看here

    【讨论】:

    • 对不起,我不明白 reduceByKey 。在正常的 lambda 表达式中,lambda a, b: a + b 表示输入对 (a,b) 给我 a + b 的总和作为结果不是吗?但在这里它做了一些奇怪的语法?
    • 要了解reduceBykey,您首先必须了解reduce。一个简单的 reduce 示例:print reduce(lambda a,b:a+b, [1,2,3]) 它迭代一个可迭代对象并将函数(第一个参数 - 这里是 lambda 表达式)应用于前两个元素,然后将结果与第三个元素 an 等一起使用。
    • 我阿法辛我重新阅读了你的解释,我只希望我也能给你加分。你的评论消除了我对 reduceByKey 的困惑
    • @jhon.smith 很高兴我能帮上忙,这里的积分毫无意义(我不能用它们来买任何东西;)干杯!
    【解决方案3】:

    这里的答案在代码级别是准确的,但它可能有助于理解幕后发生的事情。

    我的理解是,当调用 reduce 操作时,会发生大量数据混洗,导致 map() 操作获得的所有 KV 对具有相同的键值,这些键值被分配给一个任务,该任务将这些值相加KV 对的集合。然后将这些任务分配给不同的物理处理器,然后将结果与另一个数据混洗进行比较。

    所以如果映射操作产生 (第 1 类) (第 1 类) (狗 1) (第 1 类) (第 1 类) (狗 1)

    reduce 操作产生 (第 4 类) (狗 2)

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 2022-08-11
      • 2015-09-28
      • 1970-01-01
      • 2016-09-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-01-14
      相关资源
      最近更新 更多