【问题标题】:Why is Hadoop Map-Reduce application processing the same data in two different reduce tasks?为什么 Hadoop Map-Reduce 应用程序在两个不同的 reduce 任务中处理相同的数据?
【发布时间】:2017-05-24 09:04:15
【问题描述】:

我正在研究 hadoop map-reduce 框架并遵循 Hadoop- The Definitive 指南。

如书中所述,我已经实现了一个 Map-reduce 任务,该任务将输入文件作为一个整体读取并将输出委托给一个 SequenceFileOutputFormat。以下是我实现的类:

SmallFilesToSequenceFileConverter.java

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
        private Text filenameKey;

        @Override
        protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit)split).getPath();
            filenameKey = new Text(path.getName());

        }

        @Override
        protected void map(NullWritable key, BytesWritable value,
                Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            context.write(filenameKey, value);
        }
    }

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());

        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
        SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setMapperClass(SequenceFileMapper.class);
        job.setNumReduceTasks(2);

        return job.waitForCompletion(true) ? 0 : 1;

    }

    public static void main(String[] args) throws Exception{

        String argg[] = {"/Users/bng/Documents/hadoop/inputFromBook/smallFiles",
        "/Users/bng/Documents/hadoop/output_SmallFilesToSequenceFileConverter"}; 

        int exitcode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), argg);
        System.exit(exitcode);
    }
}

WholeFileInputFormat.java

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{



@Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
      public RecordReader<NullWritable, BytesWritable> createRecordReader(
          InputSplit split, TaskAttemptContext context) throws IOException,
          InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
      }
}

WholeFileRecordReader.java

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if(!processed){
            byte[] contents = new byte[(int)fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try{
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
    }

}

正如 SmallFilesToSequenceFileConverter.java 中所指定的,当我使用单个 reduce 任务时,一切正常,并且我得到了预期的输出,如下所示:

//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������xd[^•MÈÔg…h#Ÿa������a���
aaaaaaaaaa������b���
bbbbbbbbbb������c���
cccccccccc������d���
dddddddddd������dummy���ffffffffff
������e����������f���
ffffffffff

但这里的问题是,当我使用两个 reduce 任务时,我得到的输出结果正在由两个 reduce 任务处理。如果有两个 reduce 任务,这里是输出。

//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������ÓÙE˜xØÏXØâÆU.êÚ������a���
aaaaaaaaaa������b�
bbbbbbbbbb������c
cccccccccc������e����

//part-r-00001
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������π¸ú∞8Á8˜lÍx∞:¿������b���
bbbbbbbbbb������d���
dddddddddd������dummy���ffffffffff
������f���
ffffffffff

这表明两个reduce任务正在处理数据“bbbbbbbbbb”。 这里可能是什么问题?还是有这个结果就很好?还是我犯了什么错误?

作为参考,输入目录包含六个输入文件名 a 到 f,每个包含与文件名对应的数据,例如名为 a 的文件包含数据“aaaaaaaaaaa”,其他文件包含类似的数据,但 e 文件为空。还有一个名为 dummy 的文件,其中包含数据“ffffffffff”。

【问题讨论】:

    标签: java mapreduce hadoop2 reducers sequencefile


    【解决方案1】:

    我没有得到确切的原因。

    但删除 hdfs-site.xml 中指定的名称节点和数据节点目录并重新启动 hdfs、yarn 和 mr 服务解决了我的问题。

    【讨论】:

      猜你喜欢
      • 2011-07-21
      • 2012-06-14
      • 1970-01-01
      • 2011-08-06
      • 2023-03-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多