【问题标题】:NullPointerException in toString() method of CustomArrayWritable class, MapReduceCustomArrayWritable 类 MapReduce 的 toString() 方法中的 NullPointerException
【发布时间】:2016-11-03 15:06:57
【问题描述】:

我正在尝试使用 Time_Ant10s(custom ArrayWritable class) 作为 Reducer 的输出。

我参考了这个很好的问题:MapReduce Output ArrayWritable,但我在 Reducer 的最后一行的context.write() 中得到 NullPointerException。

我想Time_Ant10s.toString() 中的get() 可能会返回null,但我不知道为什么会这样。你能帮帮我吗?

主要方法

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "something");

    // general
    job.setJarByClass(CommutingTime1.class);
    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);
    job.setNumReduceTasks(1);
    job.setInputFormatClass (TextInputFormat.class);

    // mapper output
    job.setMapOutputKeyClass(Date_Uid.class);
    job.setMapOutputValueClass(Time_Ant10.class);

    // reducer output
    job.setOutputFormatClass(CommaTextOutputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Time_Ant10s.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

映射器

public static class Mapper1 extends Mapper<LongWritable, Text, Date_Uid, Time_Ant10> {
    /* map as <date_uid, time_ant10> */
    // omitted
    }
}

减速器

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
    /* <date_uid, time_ant10> -> <date, time_ant10s> */

    private IntWritable date = new IntWritable();

    @Override
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {

        date.set(date_uid.getDate());

        // count ants
        int num = 0;
        for(Time_Ant10 time_ant10 : time_ant10s){
            num++;
        }

        if(num>=1){
            Time_Ant10[] temp = new Time_Ant10[num];

            int i=0;
            for(Time_Ant10 time_ant10 : time_ant10s){
                String time = time_ant10.getTimeStr();
                int ant10 = time_ant10.getAnt10();
                temp[i] = new Time_Ant10(time, ant10);
                i++;
            }

            context.write(date, new Time_Ant10s(temp));
        }
    }
}

作家

public static class CommaTextOutputFormat extends TextOutputFormat<IntWritable, Time_Ant10s> {
    @Override
    public RecordWriter<IntWritable, Time_Ant10s> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        String extension = ".txt";
        Path file = getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        return new LineRecordWriter<IntWritable, Time_Ant10s>(fileOut, ",");
    }
}

自定义可写文件

// Time
public static class Time implements Writable { 
    private int h, m, s;

    public Time() {}

    public Time(int h, int m, int s) {
        this.h = h;
        this.m = m;
        this.s = s;
    }

    public Time(String time) {
        String[] hms = time.split(":", 0);
        this.h = Integer.parseInt(hms[0]);
        this.m = Integer.parseInt(hms[1]);
        this.s = Integer.parseInt(hms[2]);
    }

    public void set(int h, int m, int s) {
        this.h = h;
        this.m = m;
        this.s = s;
    }

    public void set(String time) {
        String[] hms = time.split(":", 0);
        this.h = Integer.parseInt(hms[0]);
        this.m = Integer.parseInt(hms[1]);
        this.s = Integer.parseInt(hms[2]);
    }

    public int[] getTime() {
        int[] time = new int[3];
        time[0] = this.h;
        time[1] = this.m;
        time[2] = this.s;
        return time;
    }

    public String getTimeStr() {
        return String.format("%1$02d:%2$02d:%3$02d", this.h, this.m, this.s);
    }

    public int getTimeInt() {
        return this.h * 10000 + this.m * 100 + this.s;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        h = in.readInt();
        m = in.readInt();
        s = in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(h);
        out.writeInt(m);
        out.writeInt(s);
    }
}

// Time_Ant10
public static class Time_Ant10 implements Writable { 
    private Time time;
    private int ant10;

    public Time_Ant10() {
        this.time = new Time();
    }

    public Time_Ant10(Time time, int ant10) {
        this.time = time;
        this.ant10 = ant10;
    }

    public Time_Ant10(String time, int ant10) {
        this.time = new Time(time);
        this.ant10 = ant10;
    }

    public void set(Time time, int ant10) {
        this.time = time;
        this.ant10 = ant10;
    }

    public void set(String time, int ant10) {
        this.time = new Time(time);
        this.ant10 = ant10;
    }

    public int[] getTime() {
        return this.time.getTime();
    }

    public String getTimeStr() {
        return this.time.getTimeStr();
    }

    public int getTimeInt() {
        return this.time.getTimeInt();
    }

    public int getAnt10() {
        return this.ant10;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        time.readFields(in);
        ant10 = in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        time.write(out);
        out.writeInt(ant10);
    }
}

// Time_Ant10s
public static class Time_Ant10s extends ArrayWritable {
    public Time_Ant10s(){
        super(Time_Ant10.class);
    }

    public Time_Ant10s(Time_Ant10[] time_ant10s){
        super(Time_Ant10.class, time_ant10s);
    }

    @Override
    public Time_Ant10[] get() {
        return (Time_Ant10[]) super.get();
    }

    @Override
    public String toString() {
        int time, ant10;
        Time_Ant10[] time_ant10s = get();
        String output = "";

        for(Time_Ant10 time_ant10: time_ant10s){
            time = time_ant10.getTimeInt();
            ant10 = time_ant10.getAnt10();
            output += time + "," + ant10 + ",";
        }

        return output;
    }
}   

// Data_Uid
public static class Date_Uid implements WritableComparable<Date_Uid> { 
    // omitted
}

错误信息

java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.NullPointerException
    at CommutingTime1$Time_Ant10s.toString(CommutingTime1.java:179)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:323)
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:291)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

  • 检查time_ant10s数组中是否有任何null元素..如果有,检查是否可以避免它们..如果不能在for loop中跳过它们..
  • 我在reduce方法中检查了temp 之前的行是否有空值context.write()。正如你所说,临时有一些空值。但是原始 date_ant10s 中没有 null。所以reduce可能有问题...
  • 我发现temp 包含所有空值...
  • 我发现reduce 中的第二个循环根本不起作用,即使num &gt;= 1 也是如此。嗯..
  • ok,我发现Iterable不能迭代两次,在this question

标签: java hadoop mapreduce


【解决方案1】:

我发现问题是Iterable中的reduce不能被迭代两次。所以我参考this page并改变reducer和Time_Ant10s如下。现在一切都很好。

@redflar3:非常感谢你给我提示。我完全误解了我的代码哪里有错误。

减速器

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
    private IntWritable date = new IntWritable();

    @Override
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
        String time = "";
        int ant10;

        date.set(date_uid.getDate());

        ArrayList<Time_Ant10> temp_list = new ArrayList<Time_Ant10>();
        for (Time_Ant10 time_ant10 : time_ant10s){
            time = time_ant10.getTimeStr();
            ant10 = time_ant10.getAnt10();
            temp_list.add(new Time_Ant10(time, ant10));
        }

        if(temp_list.size() >= 1){
            Time_Ant10[] temp_array = temp_list.toArray(new Time_Ant10[temp_list.size()]);
            context.write(date, new Time_Ant10s(temp_array));
        }
    }
}

Time_Ant10s

public static class Time_Ant10s extends ArrayWritable {
    public Time_Ant10s(){
        super(Time_Ant10.class);
    }

    public Time_Ant10s(Time_Ant10[] time_ant10s){
        super(Time_Ant10.class, time_ant10s);
    }

    @Override
    public Time_Ant10[] get() {
        return (Time_Ant10[]) super.get();
    }

    @Override
    public String toString() {
        int time, ant10;
        Time_Ant10[] time_ant10s = get();
        String output = "";

        for(Time_Ant10 time_ant10: time_ant10s){
            time = time_ant10.getTimeInt();
            ant10 = time_ant10.getAnt10();
            output += time + "," + ant10 + ",";
        }

        return output;
    }
}

【讨论】:

    猜你喜欢
    • 2017-05-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-07
    • 2012-01-11
    相关资源
    最近更新 更多