【问题标题】:Launch Hadoop MapReduce job via Python without PuTTy/SSH通过 Python 启动 Hadoop MapReduce 作业,无需 PuTTy/SSH
【发布时间】:2015-12-08 20:40:31
【问题描述】:

我通过 PuTTy 登录 SSH 来运行 Hadoop MapReduce 作业,这需要我在 PuTTY 中输入主机名/IP 地址、登录名和密码才能获得 SSH 命令行窗口。进入 SSH 控制台窗口后,我会提供相应的 MR 命令,例如:

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar -file /nfs_home/appers/user1/mapper.py -file / nfs_home/appers/user1/reducer.py -mapper '/usr/lib/python_2.7.3/bin/python mapper.py' -reducer '/usr/lib/python_2.7.3/bin/python reducer.py' -input / ccexp/data/test_xml/0901282-510179094535002-oozie-oozi-W/extractOut//.xml -output /user/ccexptest/output/user1/MRoutput

我想做的是使用 Python 来改变这个笨重的过程,这样我就可以从 Python 脚本中启动 MapReduce 作业,而不必通过 PuTTy 登录 SSH。

这可以做到吗?如果可以,有人可以告诉我怎么做吗?

【问题讨论】:

    标签: python hadoop ssh paramiko


    【解决方案1】:

    我已经用以下脚本解决了这个问题:

    import paramiko
    
    # Define connection info
    host_ip = 'xx.xx.xx.xx'
    user = 'xxxxxxxx'
    pw = 'xxxxxxxx'
    
    # Paths
    input_loc = '/nfs_home/appers/extracts/*/*.xml'
    output_loc = '/user/lcmsprod/output/cnielsen/'
    python_path = "/usr/lib/python_2.7.3/bin/python"
    hdfs_home = '/nfs_home/appers/cnielsen/'
    output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
    
    # File names
    xml_lookup_file = 'product_lookups.xml'
    mapper = 'Mapper.py'
    reducer = 'Reducer.py'
    helper_script = 'Process.py'
    product_name = 'test1'
    output_ref = 'test65'
    
    # ----------------------------------------------------
    
    def buildMRcommand(product_name):
        space = " "
        mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
                            '-files', hdfs_home+xml_lookup_file,
                            '-file', hdfs_home+mapper,
                            '-file', hdfs_home+reducer,
                            '-mapper', "'"+python_path, mapper, product_name+"'",
                            '-file', hdfs_home+helper_script,
                            '-reducer', "'"+python_path, reducer+"'",
                            '-input', input_loc,
                            '-output', output_loc+output_ref]
    
        MR_command = space.join(mr_command_list)
        print MR_command
        return MR_command
    
    # ----------------------------------------------------
    
    def unbuffered_lines(f):
        line_buf = ""
        while not f.channel.exit_status_ready():
            line_buf += f.read(1)
            if line_buf.endswith('\n'):
                yield line_buf
                line_buf = ''
    
    # ----------------------------------------------------
    
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host_ip, username=user, password=pw)
    
    # Build Commands
    list_dir = "ls "+hdfs_home+" -l"
    getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+"test_011216_0.txt"
    
    # Run Command
    stdin, stdout, stderr = client.exec_command(list_dir)
    ##stdin, stdout, stderr = client.exec_command(buildMRcommand(product_name))
    ##stdin, stdout, stderr = client.exec_command(getmerge)
    
    print "Executing command..."
    writer = open(output_log, 'w')
    
    for l in unbuffered_lines(stderr):
        e = '[stderr] ' + l
        print '[stderr] ' + l.strip('\n')
        writer.write(e)
    
    for line in stdout:
        r = '[stdout]' + line
        print '[stdout]' + line.strip('\n')
        writer.write(r)
    
    client.close()
    writer.close()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-06-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-08-20
      相关资源
      最近更新 更多