【问题标题】:mrjob: setup logging on EMRmrjob:在 EMR 上设置日志记录
【发布时间】:2014-11-25 05:14:43
【问题描述】:

我正在尝试使用 mrjob 在 EMR 上运行 hadoop,但不知道如何设置日志记录(用户在 map/reduce 步骤中生成的日志),因此我将能够在集群终止后访问它们。

我曾尝试使用logging 模块、printsys.stderr.write() 设置日志记录,但到目前为止没有运气。对我有用的唯一选择是将日志写入文件,然后通过 SSH 连接机器并读取它,但这很麻烦。我希望将我的日志转到 stderr/stdout/syslog 并自动收集到 S3,以便在集群终止后查看它们。

这里是带有日志记录的 word_freq 示例:

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
import logging
import logging.handlers
import sys

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper_init(self):
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(logging.FileHandler("/tmp/mr.log"))
        self.logger.addHandler(logging.StreamHandler())
        self.logger.addHandler(logging.StreamHandler(sys.stdout))
        self.logger.addHandler(logging.handlers.SysLogHandler())

    def mapper(self, _, line):
        self.logger.info("Test logging: %s", line)
        sys.stderr.write("Test stderr: %s\n" % line)
        print "Test print: %s" % line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
    MRWordFreqCount.run()

【问题讨论】:

    标签: python hadoop logging mapreduce mrjob


    【解决方案1】:

    这是一个登录 stdout (python3) 的示例

    from mrjob.job import MRJob
    from mrjob.job import MRStep
    from mrjob.util import log_to_stream, log_to_null
    import re
    import sys
    import logging
    
    log = logging.getLogger(__name__)
    
    WORD_RE = re.compile(r'[\w]+')
    
    class MostUsedWords(MRJob):
    
        def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
            log_to_stream(name='mrjob', debug=verbose, stream=stream)
            log_to_stream(name='__main__', debug=verbose, stream=stream)
    
        def steps(self):
            return [
                MRStep (mapper = self.mapper_get_words,
                        combiner = self.combiner_get_words,
                        reducer = self.reduce_get_words),
                MRStep (reducer = self.reducer_find_max)
            ]
            pass
        def mapper_get_words(self,  _, line):
            for word in WORD_RE.findall(line):
                yield (word.lower(), 1)
    
        def combiner_get_words(self, word, counts):
            yield (word, sum(counts))
    
        def reduce_get_words(self, word, counts):
            log.info(word + "\t" +str(list(counts)) )
            yield None, (sum(counts), word)
    
        def reducer_find_max(self, key, value):
            # value is pairs i.e., tuples
            yield max(value)
    
    
    if __name__ == '__main__':
        MostUsedWords.run()
    

    【讨论】:

      【解决方案2】:

      在所有选项中,唯一真正有效的是使用带有直接写入 (sys.stderr.write) 的 stderr 或使用带有 StreamHandler 的记录器到 stderr。

      稍后可以在作业完成(成功或出错)后从以下位置检索日志:

      [s3_log_uri]/[jobflow-id]/task-attempts/[job-id]/[attempt-id]/stderr

      请务必将日志保存在您的 runners.emr.cleanup 配置中。

      【讨论】:

      • 如果我不在 EMR 上运行我的 MRJob 脚本,你知道日志默认保存在哪里吗?
      • @crypdick 很遗憾没有
      猜你喜欢
      • 2017-07-25
      • 1970-01-01
      • 2012-09-03
      • 1970-01-01
      • 2013-08-04
      • 1970-01-01
      • 2016-11-27
      • 2023-03-23
      • 1970-01-01
      相关资源
      最近更新 更多