【问题标题】:Hadoop - Reducer is waiting for Mapper inputs?Hadoop - Reducer 正在等待 Mapper 输入?
【发布时间】:2012-05-30 00:18:01
【问题描述】:

如标题中所述,当我执行我的 Hadoop 程序(并在本地模式下对其进行调试)时,会发生以下情况:

1.我的测试数据中的所有 10 条 csv 行都在映射步骤后调用的 Mapper、Partitioner 和 RawComperator(OutputKeyComparatorClass) 中正确处理。但是 OutputValueGroupingComparatorClass 和 ReduceClass 的函数不会在之后执行。

2.我的应用程序如下所示。 (由于空间限制,我省略了我用作配置参数的类的实现,直到有人有想法,涉及它们):

public class RetweetApplication {

    public static int DEBUG = 1;
    static String INPUT = "/home/ema/INPUT-H";
    static String OUTPUT = "/home/ema/OUTPUT-H "+ (new Date()).toString();

    public static void main(String[] args) {
    JobClient client = new JobClient();
    JobConf conf = new JobConf(RetweetApplication.class);


    if(DEBUG > 0){
        conf.set("mapred.job.tracker", "local");
        conf.set("fs.default.name", "file:///");
        conf.set("dfs.replication", "1");
    }


    FileInputFormat.setInputPaths(conf, new Path(INPUT));   
    FileOutputFormat.setOutputPath(conf, new Path(OUTPUT));


    //conf.setOutputKeyClass(Text.class);
    //conf.setOutputValueClass(Text.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);

    conf.setMapperClass(RetweetMapper.class);
    conf.setPartitionerClass(TweetPartitioner.class);
    conf.setOutputKeyComparatorClass(TwitterValueGroupingComparator.class);
    conf.setOutputValueGroupingComparator(TwitterKeyGroupingComparator.class);
    conf.setReducerClass(RetweetReducer.class);

    conf.setOutputFormat(TextOutputFormat.class);

    client.setConf(conf);
    try {
        JobClient.runJob(conf);
    } catch (Exception e) {
        e.printStackTrace();
    }
    }
}

3.我得到以下控制台输出(对不起格式,但不知何故此日志格式不正确):

12/05/22 03:51:05 信息 mapred.MapTask: io.sort.mb = 100 12/05/22 03:51:05 INFO mapred.MapTask: 数据缓冲区 = 79691776/99614720

12/05/22 03:51:05 INFO mapred.MapTask: 记录缓冲区 = 262144/327680

12/05/22 03:51:06 INFO mapred.JobClient: map 0% reduce 0%

12/05/22 03:51:11 信息 mapred.LocalJobRunner: 文件:/home/ema/INPUT-H/tweets:0+967 12/05/22 03:51:12 信息 mapred.JobClient:地图 39% 减少 0%

12/05/22 03:51:14 信息 mapred.LocalJobRunner: 文件:/home/ema/INPUT-H/tweets:0+967 12/05/22 03:51:15 信息 mapred.MapTask:开始刷新地图输出

12/05/22 03:51:15 INFO mapred.MapTask: 完成溢出 0

12/05/22 03:51:15 信息 mapred.Task:Task:attempt_local_0001_m_000000_0 已经完成了。并且正在提交中

12/05/22 03:51:15 INFO mapred.JobClient: map 79% reduce 0%

12/05/22 03:51:17 信息 mapred.LocalJobRunner: 文件:/home/ema/INPUT-H/tweets:0+967

12/05/22 03:51:17 信息 mapred.LocalJobRunner: 文件:/home/ema/INPUT-H/tweets:0+967

12/05/22 03:51:17 信息 mapred.Task: 任务 'attempt_local_0001_m_000000_0' 完成。

12/05/22 03:51:17 信息 mapred.Task:使用 ResourceCalculatorPlugin: org.apache.hadoop.util.LinuxResourceCalculatorPlugin@35eed0

12/05/22 03:51:17 信息 mapred.ReduceTask:ShuffleRamManager: MemoryLimit=709551680,MaxSingleShuffleLimit=177387920

12/05/22 03:51:17 信息 mapred.ReduceTask: attempt_local_0001_r_000000_0 线程已启动:用于合并的线程 磁盘文件

12/05/22 03:51:17 信息 mapred.ReduceTask: attempt_local_0001_r_000000_0 线程等待:用于合并的线程 磁盘文件

12/05/22 03:51:17 信息 mapred.ReduceTask: attempt_local_0001_r_000000_0 线程已启动:用于合并的线程 内存文件

12/05/22 03:51:17 INFO mapred.ReduceTask: attempt_local_0001_r_000000_0 需要另外 1 个地图输出,其中 0 是 已经在进行中 12/05/22 03:51:17 信息 mapred.ReduceTask: attempt_local_0001_r_000000_0 计划的 0 个输出(0 个慢速主机和 0 个 复制主机)

12/05/22 03:51:17 信息 mapred.ReduceTask: attempt_local_0001_r_000000_0 线程已启动:轮询地图的线程 完成事件

12/05/22 03:51:18 INFO mapred.JobClient: map 100% reduce 0% 12/05/22 03:51:23 INFO mapred.LocalJobRunner: reduce > copy >

粗体标记线从这一点开始无限重复。

4.映射器看到每个元组后,许多打开的进程都处于活动状态:

RetweetApplication (1) [Remote Java Application]    
    OpenJDK Client VM[localhost:5002]   
        Thread [main] (Running) 
        Thread [Thread-2] (Running) 
        Daemon Thread [communication thread] (Running)  
        Thread [MapOutputCopier attempt_local_0001_r_000000_0.0] (Running)  
        Thread [MapOutputCopier attempt_local_0001_r_000000_0.1] (Running)  
        Thread [MapOutputCopier attempt_local_0001_r_000000_0.2] (Running)  
        Thread [MapOutputCopier attempt_local_0001_r_000000_0.4] (Running)  
        Thread [MapOutputCopier attempt_local_0001_r_000000_0.3] (Running)  
        Daemon Thread [Thread for merging on-disk files] (Running)  
        Daemon Thread [Thread for merging in memory files] (Running)    
        Daemon Thread [Thread for polling Map Completion Events] (Running)  

有什么理由,为什么 Hadoop 期望映射器的输出(请参阅日志中的粗体标记行)比我放入输入目录的输出更多?如前所述,我调试了所有输入都在映射器/分区器/等中正确处理。

更新 在 Chris(参见 cmets)的帮助下,我发现我的程序没有像我预期的那样在 localMode 下启动:ReduceTask 类中的isLocal 变量设置为false,尽管它应该是@987654326 @。

对我来说,完全不清楚为什么会发生这种情况,因为必须设置为启用独立模式的 3 个选项设置正确。令人惊讶的是:local 设置被忽略了,“从普通磁盘读取”设置没有,这很奇怪恕我直言,因为我认为local 模式和file:/// 协议是耦合的。

在调试ReduceTask 期间,我通过在调试视图中评估isLocal=trueisLocal 变量设置为true,然后尝试执行程序的其余部分。它没有成功,这是堆栈跟踪:

12/05/22 14:28:28 INFO mapred.LocalJobRunner: 
12/05/22 14:28:28 INFO mapred.Merger: Merging 1 sorted segments
12/05/22 14:28:28 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1956 bytes
12/05/22 14:28:28 INFO mapred.LocalJobRunner: 
12/05/22 14:28:29 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.default.name;  Ignoring.
12/05/22 14:28:29 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.job.tracker;  Ignoring.
12/05/22 14:28:30 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 0 time(s).
12/05/22 14:28:31 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 1 time(s).
12/05/22 14:28:32 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 2 time(s).
12/05/22 14:28:33 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 3 time(s).
12/05/22 14:28:34 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 4 time(s).
12/05/22 14:28:35 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 5 time(s).
12/05/22 14:28:36 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 6 time(s).
12/05/22 14:28:37 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 7 time(s).
12/05/22 14:28:38 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 8 time(s).
12/05/22 14:28:39 INFO ipc.Client: Retrying connect to server: master/127.0.0.1:9001. Already tried 9 time(s).
12/05/22 14:28:39 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.default.name;  Ignoring.
12/05/22 14:28:39 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.job.tracker;  Ignoring.
12/05/22 14:28:39 WARN mapred.LocalJobRunner: job_local_0001
java.net.ConnectException: Call to master/127.0.0.1:9001 failed on connection exception: java.net.ConnectException: Connection refused
    at org.apache.hadoop.ipc.Client.wrapException(Client.java:1095)
    at org.apache.hadoop.ipc.Client.call(Client.java:1071)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
    at $Proxy1.getProtocolVersion(Unknown Source)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
    at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:238)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:203)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:123)
    at org.apache.hadoop.mapred.ReduceTask$OldTrackingRecordWriter.<init>(ReduceTask.java:446)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:490)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:260)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:592)
    at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:489)
    at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:434)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:184)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1202)
    at org.apache.hadoop.ipc.Client.call(Client.java:1046)
    ... 17 more
12/05/22 14:28:39 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.default.name;  Ignoring.
12/05/22 14:28:39 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.job.tracker;  Ignoring.
12/05/22 14:28:39 INFO mapred.JobClient: Job complete: job_local_0001
12/05/22 14:28:39 INFO mapred.JobClient: Counters: 20
12/05/22 14:28:39 INFO mapred.JobClient:   File Input Format Counters 
12/05/22 14:28:39 INFO mapred.JobClient:     Bytes Read=967
12/05/22 14:28:39 INFO mapred.JobClient:   FileSystemCounters
12/05/22 14:28:39 INFO mapred.JobClient:     FILE_BYTES_READ=14093
12/05/22 14:28:39 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=47859
12/05/22 14:28:39 INFO mapred.JobClient:   Map-Reduce Framework
12/05/22 14:28:39 INFO mapred.JobClient:     Map output materialized bytes=1960
12/05/22 14:28:39 INFO mapred.JobClient:     Map input records=10
12/05/22 14:28:39 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/05/22 14:28:39 INFO mapred.JobClient:     Spilled Records=10
12/05/22 14:28:39 INFO mapred.JobClient:     Map output bytes=1934
12/05/22 14:28:39 INFO mapred.JobClient:     Total committed heap usage (bytes)=115937280
12/05/22 14:28:39 INFO mapred.JobClient:     CPU time spent (ms)=0
12/05/22 14:28:39 INFO mapred.JobClient:     Map input bytes=967
12/05/22 14:28:39 INFO mapred.JobClient:     SPLIT_RAW_BYTES=82
12/05/22 14:28:39 INFO mapred.JobClient:     Combine input records=0
12/05/22 14:28:39 INFO mapred.JobClient:     Reduce input records=0
12/05/22 14:28:39 INFO mapred.JobClient:     Reduce input groups=0
12/05/22 14:28:39 INFO mapred.JobClient:     Combine output records=0
12/05/22 14:28:39 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
12/05/22 14:28:39 INFO mapred.JobClient:     Reduce output records=0
12/05/22 14:28:39 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
12/05/22 14:28:39 INFO mapred.JobClient:     Map output records=10
12/05/22 14:28:39 INFO mapred.JobClient: Job Failed: NA
java.io.IOException: Job failed!
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1265)
    at uni.kassel.macek.rtprep.RetweetApplication.main(RetweetApplication.java:50)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

由于这个堆栈跟踪现在向我显示,在执行期间使用了端口 9001,我猜想 xml 配置文件以某种方式覆盖了 local-java-made 设置(我用于测试),这很奇怪,因为我读过在互联网上一遍又一遍,java覆盖xml配置。如果没有人知道如何更正此问题,请尝试简单地删除所有配置 XML。或许这样就解决了问题……

新更新

重命名 Hadoops conf 文件夹解决了等待复印机的问题,程序一直执行到最后。遗憾的是,尽管HADOOP_OPTS 设置正确,但执行不再等待我的调试器。

RESUME:这只是一个配置问题:XML 可能(对于某些配置参数)会覆盖 JAVA。如果有人知道我如何让调试再次运行,那将是完美的,但现在我很高兴我再也看不到这个堆栈跟踪了! ;)

感谢 Chris 付出的时间和精力!

【问题讨论】:

  • 您肯定有一些奇怪的事情发生,因为 MapOutputCopier 线程不应该在本地模式下运行。你用的是什么版本的hadoop?
  • 嗨,克里斯,感谢您的回复!我正在使用 V1.0.2,但我没有启动恶魔。当我执行 jps 时,我只看到 jps 正在运行。
  • 你能粘贴你的完整驱动代码吗?
  • 这个“驱动程序”是什么?您是指其他涉及的课程?
  • 您发布的第一个代码块通常称为驱动程序 - 包含main(String args[]) 块的类,应该扩展Configured 类,并实现Tool 接口跨度>

标签: hadoop local reduce


【解决方案1】:

抱歉,我之前没有看到这一点,但您的 conf xml 文件中似乎有两个重要的配置属性设置为 final,如以下日志语句所示:

12/05/22 14:28:29 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.default.name;  Ignoring.
12/05/22 14:28:29 WARN conf.Configuration: file:/tmp/hadoop-ema/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.job.tracker;  Ignoring.

这意味着您的作业实际上无法在本地模式下运行,它以本地模式启动,但 reducer 读取序列化作业配置并确定它不是本地模式,并尝试通过任务跟踪器获取地图输出端口。

您说您的解决方法是重命名 conf 文件夹 - 这会将默认 hadoop 恢复为默认配置,其中这两个属性未标记为“最终”

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-02
    • 2016-06-16
    • 2019-06-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多