【发布时间】:2012-01-04 09:19:12
【问题描述】:
我从 Hadoop 教程中得到了这个。它是一个 reducer,基本上从标准输入中获取 (word, count) 对并将它们求和。
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_uppercount = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
pass
现在,我希望能够接收元组(word、count1、count2),但是 groupby 和 sum(int(count for current_word, count in group) 业务对我来说完全难以辨认。我如何修改这个块,以便它基本上继续做它现在所做的,但是有第二个计数器值?
IE。输入是 (word, count1, count2),输出是 (word, count1, count2)。
编辑 1:
from itertools import groupby, izip
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 2)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
counts_a, counts_b = izip((int(count_a), int(count_b)) for current_word, count_a, count_b in group)
t1, t2 = sum(counts_a), sum(counts_b)
print "%s%s%d%s%d" % (current_word, separator, t1, separator, t2)
except ValueError:
pass
这是一个 Hadoop 作业,所以输出如下:
11/11/23 18:44:21 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob: map 100% reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob: map 100% reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob: map 100% reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob: map 100% reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob: map 100% reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob: map 100% reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob: map 100% reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob: map 100% reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob: map 100% reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob: map 100% reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!
来自日志:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.
【问题讨论】:
-
并阅读 docs 以了解 groupby 在做什么不是一种选择?
标签: python functional-programming list-comprehension