【问题标题】:Python - How to pass a directory as MapReduce inputPython - 如何将目录作为 MapReduce 输入传递
【发布时间】:2017-07-04 07:08:42
【问题描述】:

我已经用 Python 编写了一个简单的 MapReduce 示例。如果输入是一个文件,例如text 文件,为了运行代码,我们只需使用以下模式:cat <data> | map | sort | reduce,例如在我的例子中是:cat data | ./mapper.py | sort | ./reducer.py,一切正常。

但我更改了映射器和缩减器以从包含 .gz 文件的 directory 读取数据。所以我应该通过path of the directory 作为输入。我测试了以下终端命令cat dat/ | ./mapper.py | sort | ./reducer.py,而包含数据的目录是dat/,但我遇到了错误:

cat: dat/: Is a directory
Traceback (most recent call last):
  File "./mapper.py", line 9, in <module>
    for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'

如何在 Python 中将目录作为输入传递给 Mapreduce?

以下是我的代码:

mapper.py

#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip

QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
    f = gzip.open(filename, 'r')
    for line in f:
        val = line.strip()
        (year, temp, q) = (val[15:19], val[87:92], val[92:93])
        if temp != MISSING and q in QUALITY:
            print " %s\t%s" % (year, temp)

reducer.py

#!/usr/bin/env python
import sys

max_val = -sys.maxint
key = ''
for line in sys.stdin:
    (key, val) = line.strip().split('\t')
    max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)

【问题讨论】:

  • zcat data/*.gz | ./mapper.py | sort | ./reducer.py
  • @philantrovert 谢谢,请注意我的映射器,我想输入是目录地址,包含.gz 文件,我使用for loop 像以前一样阅读它们,但不是MapReduce 模型。但我认为您的建议传递了目录中所有.gz 文件的确切地址。我说的对吗?
  • zcat (gzip + cat) 提取 .gz 文件并将其内容传递给您的映射器。也许,这将适用于您的 .gz 文件,而无需更改您的映射器。

标签: python hadoop mapreduce hadoop-streaming


【解决方案1】:

for filename in glob.glob(sys.stdin + '*.gz'): 行需要来自stdin 的字符串。因此,只需传递一个字符串 (echo) 而不是文件内容 (cat):

$ echo dat/ | ./mapper.py | sort | ./reducer.py

但是,为什么要通过管道传递参数?通常参数是由 python 通过sys.argv 直接传递和读取的(或者更好的是通过诸如“argparse”之类的解释器)。

【讨论】:

    【解决方案2】:

    要获取当前工作目录的路径,请使用:

    import os
    path = os.getcwd()
    

    您可以从此文件中获取所有文件:

    filenames = os.listdir(path)
    # filter files that doesn't have .gz filetype
    filenames = [file_name for file_name in filenames if file_name.endswith('.gz')]
    

    您可以简单地遍历文件:

    for filename in filenames:
        f = gzip.open(path+filename, 'r')
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多