【问题标题】:Groupby and list comprehension headache in pythonpython中的Groupby和列表理解头痛
【发布时间】:2012-01-04 09:19:12
【问题描述】:

我从 Hadoop 教程中得到了这个。它是一个 reducer,基本上从标准输入中获取 (word, count) 对并将它们求和。

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_uppercount = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            pass

现在,我希望能够接收元组(word、count1、count2),但是 groupbysum(int(count for current_word, count in group) 业务对我来说完全难以辨认。我如何修改这个块,以便它基本上继续做它现在所做的,但是有第二个计数器值? IE。输入是 (word, count1, count2),输出是 (word, count1, count2)。

编辑 1:

from itertools import groupby, izip
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 2)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            counts_a, counts_b = izip((int(count_a), int(count_b)) for current_word, count_a, count_b in group)
            t1, t2 = sum(counts_a), sum(counts_b)
            print "%s%s%d%s%d" % (current_word, separator, t1, separator, t2)
        except ValueError:
            pass

这是一个 Hadoop 作业,所以输出如下:

11/11/23 18:44:21 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob:  map 100%  reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob:  map 100%  reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob:  map 100%  reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!

来自日志:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.

【问题讨论】:

  • 并阅读 docs 以了解 groupby 在做什么不是一种选择?

标签: python functional-programming list-comprehension


【解决方案1】:

分组

这是来自itertools 模块的groupby 函数,记录在heredata 是对每个元素应用itemgetter(0)operator 模块中的itemgetter 类的一个实例,记录在here)的结果“分组”的。它返回成对的 (key result, iterator-over-elements-with-that-key)。因此,每次循环时,current_word 是一组 data 行共有的“单词”(索引 0,即第一项,由 itemgetter 提取),group 是以word 开头的data 行的迭代器。如代码文档中所述,文件的每一行都有两个单词:一个实际的“单词”和一个计数(旨在解释为数字的文本)

sum(int(count) for current_word, count in group)

正是它所说的count 的整数值之和,对于在group 中找到的每个 (current_word, count) 对。如上所述,每个group 都是来自data 的一组行。因此,我们将所有以current_word 开头的行,将它们的字符串count 值转换为整数,然后将它们相加。

我如何修改这个块,让它基本上继续做它现在所做的,但是有第二个计数器值? IE。输入是 (word, count1, count2),输出是 (word, count1, count2)。

那么,您希望每个计数代表什么,以及您希望数据来自哪里?

我将采用我认为最简单的解释:您将修改数据文件以在每行包含三个项目,并且您将获取总和分别从每列数字中提取。

groupby 将是相同的,因为我们仍在对以相同方式获得的行进行分组,并且仍在根据“单词”对它们进行分组。

sum 部分需要计算两个值:第一列数字的总和和第二列数字的总和。

当我们遍历group 时,我们将得到三个值的集合,因此我们希望将它们解压缩为三个值:例如current_word, group_a, group_b。对于其中的每一个,我们希望将整数转换应用于每一行的两个数字。这给了我们一个数字对序列;如果我们想将所有第一个数字和所有第二个数字相加,那么我们应该制作一对数字序列。为此,我们可以使用另一个名为izipitertools 函数。然后,我们可以分别对每一个进行求和,方法是再次将它们解压缩为两个单独的数字序列变量,然后对它们求和。

因此:

counts_a, counts_b = izip(
    (int(count_a), int(count_b)) for current_word, count_a, count_b in group
)
total_a, total_b = sum(counts_a), sum(counts_b)

或者我们可以通过再次执行相同的 (x for y in z) 技巧来进行一对计数:

totals = (
    sum(counts)
    for counts in izip(
        (int(count_a), int(count_b)) for current_word, count_a, count_b in group
    )
)

虽然在打印语句中使用该结果会有些棘手:)

【讨论】:

  • 如果你指定了它是如何失败的,它将帮助我找出问题所在。
  • 以上内容非常有用,但是 sn-ps 代码实际上都不适用于我的情况。我已经编辑了我的帖子以包含更新的代码 + 输出。有什么想法吗?
【解决方案2】:
from collections import defaultdict

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    counts = defaultdict(lambda: [0, 0])
    for word, (count1, count2) in data:
        values = counts[word]
        values[0] += count1
        values[1] += count2

    for word, (count1, count2) in counts.iteritems():
        print('{0}\t{1}\t{2}'.format(word, count1, count2))

【讨论】:

  • 这当然是一种有效的方法,但我的印象是 OP 想要真正理解函数式编程模式和习语。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-11-08
  • 1970-01-01
  • 2017-11-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-08-05
相关资源
最近更新 更多