【问题标题】:Hadoop MapReduce chain with ArrayWritable带有 ArrayWritable 的 Hadoop MapReduce 链
【发布时间】:2016-04-13 16:55:12
【问题描述】:

我正在尝试创建一个由两个步骤组成的 mapreduce 链。 第一个 reduce 将键值对作为 (key, value) 发出,其中 value 是自定义对象的列表,第二个映射器应该读取第一个 reducer 的输出。 该列表是一个自定义的 ArrayWritable。以下是相关代码:

自定义对象:

public class Custom implements Writable {
    private Text document;
    private IntWritable count;

    public Custom(){
        setDocument("");
        setCount(0);
    }

    public Custom(String document, int count) {
        setDocument(document);
        setCount(count);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        document.readFields(in);
        count.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        document.write(out);
        count.write(out);
    }

    @Override
    public String toString() {
        return this.document.toString() + "\t" + this.count.toString();
    }

    public int getCount() {
        return count.get();
    }

    public void setCount(int count) {
        this.count = new IntWritable(count);
    }

    public String getDocument() {
        return document.toString();
    }

    public void setDocument(String document) {
        this.document = new Text(document);
    }
}

自定义 ArrayWritable:

 class MyArrayWritable extends ArrayWritable {
    public MyArrayWritable(Writable[] values) {
        super(Custom.class, values);
    }

    public MyArrayWritable() {
        super(Custom.class);
    }

    @Override
    public Custom[] get() {
        return (Custom[]) super.get();
    }

    @Override
    public String toString() {
      return Arrays.toString(get());
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        super.write(arg0);
    }
}

第一个减速器:

public static class NGramReducer extends Reducer<Text, Text, Text, MyArrayWritable> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        //other code
        context.write(key, mArrayWritable);
    }
}

第二个映射器:

public static class SecondMapper extends Mapper<Text, MyArrayWritable, Text, IntWritable> {
    private StringBuilder docBuilder= new StringBuilder();

    public void map(Text key, MyArrayWritable value, Context context) throws IOException, InterruptedException {
        //whatever code
    }
}

这些是 main 中的设置:

    //...
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(MyArrayWritable.class);
    job1.setInputFormatClass(WholeFileInputFormat.class);
    FileInputFormat.addInputPath(job1, new Path(args[2]));
    FileOutputFormat.setOutputPath(job1, TEMP_PATH);
    //...
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    FileInputFormat.addInputPath(job2, TEMP_PATH);
    FileOutputFormat.setOutputPath(job2, new Path(args[3]));

当我运行它时,我得到了这个错误 错误:java.lang.ClassCastException:org.apache.hadoop.io.Text 无法转换为 Detector$MyArrayWritable

有什么问题?我必须写一个 FileInputFormat 吗? (job1 工作正常)

【问题讨论】:

    标签: java hadoop mapreduce writable


    【解决方案1】:

    看起来这是因为你的工作 2 InputFormatKeyValueTextInputFormat.class 需要一个键和值,它们都是 Text 对象。由于您的作业 1 输出 (Text,MyArrayWritable),因此与该值存在冲突。

    幸运的是,您不必编写自定义 OutputFormat 来满足您的数据需求!只需将作业 1 数据的输出写入序列文件,从而将数据保持为二进制形式:

    //...
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(MyArrayWritable.class);
    job1.setInputFormatClass(WholeFileInputFormat.class);
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job1, new Path(args[2]));
    SequenceFileOutputFormat.setOutputPath(job1, TEMP_PATH);
    //...
    job2.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.addInputPath(job2, TEMP_PATH);
    FileOutputFormat.setOutputPath(job2, new Path(args[3]));
    

    【讨论】:

    • 我修改了输入和输出,但现在我收到了这个错误 'Error: java.lang.RuntimeException: java.lang.NoSuchMethodException: Detector$MyArrayWritable.()'
    • stackoverflow.com/questions/11446635/… 似乎回答了那个问题
    猜你喜欢
    • 2017-01-07
    • 2015-05-08
    • 1970-01-01
    • 1970-01-01
    • 2012-10-10
    • 2013-01-30
    • 1970-01-01
    • 2016-09-25
    • 1970-01-01
    相关资源
    最近更新 更多