【发布时间】:2017-07-04 07:08:42
【问题描述】:
我已经用 Python 编写了一个简单的 MapReduce 示例。如果输入是一个文件,例如text 文件,为了运行代码,我们只需使用以下模式:cat <data> | map | sort | reduce,例如在我的例子中是:cat data | ./mapper.py | sort | ./reducer.py,一切正常。
但我更改了映射器和缩减器以从包含 .gz 文件的 directory 读取数据。所以我应该通过path of the directory 作为输入。我测试了以下终端命令cat dat/ | ./mapper.py | sort | ./reducer.py,而包含数据的目录是dat/,但我遇到了错误:
cat: dat/: Is a directory
Traceback (most recent call last):
File "./mapper.py", line 9, in <module>
for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'
如何在 Python 中将目录作为输入传递给 Mapreduce?
以下是我的代码:
mapper.py
#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip
QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
f = gzip.open(filename, 'r')
for line in f:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if temp != MISSING and q in QUALITY:
print " %s\t%s" % (year, temp)
reducer.py
#!/usr/bin/env python
import sys
max_val = -sys.maxint
key = ''
for line in sys.stdin:
(key, val) = line.strip().split('\t')
max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
【问题讨论】:
-
zcat data/*.gz | ./mapper.py | sort | ./reducer.py -
@philantrovert 谢谢,请注意我的映射器,我想输入是目录地址,包含
.gz文件,我使用for loop像以前一样阅读它们,但不是MapReduce 模型。但我认为您的建议传递了目录中所有.gz文件的确切地址。我说的对吗? -
zcat(gzip + cat) 提取 .gz 文件并将其内容传递给您的映射器。也许,这将适用于您的 .gz 文件,而无需更改您的映射器。
标签: python hadoop mapreduce hadoop-streaming