【发布时间】:2016-04-26 05:03:26
【问题描述】:
我已经定义了 2 个映射器来分别处理 2 个文件。
第一个映射器的输出- 第二个映射器的输出 -
在驱动程序代码中,我使用 MultipleInput 类的 addInputPath() 添加了两个映射器。
在运行 jar 时,我遇到 Type mismatch 错误。
16/04/24 18:40:28 INFO mapreduce.Job: Task Id : attempt_1461435780053_0008_m_000001_0, Status : FAILED
Error: java.io.IOException: Type mismatch in value from map: expected hadoop.StationObj, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1077)
下面是代码
public static class customerMapper extends Mapper<LongWritable,Text,IntWritable,StationObj>
{
IntWritable outkey=new IntWritable();
StationObj outvalue=new StationObj();
//2,Russia,Jhonson,10000
public void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException
{
String []cols=values.toString().split(",");
outkey.set(Integer.parseInt(cols[0]));
outvalue.setAmount(Integer.parseInt(cols[3]));
outvalue.setCountry(cols[1]);
outvalue.setProduct(cols[2]);
context.write(outkey, outvalue);
}
}
public static class countryMapper extends Mapper<LongWritable,Text,IntWritable,Text>
{
IntWritable outkey=new IntWritable();
Text outvalue=new Text();
public void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException
{
String []cols=values.toString().split(",");
outkey.set(Integer.parseInt(cols[0]));
outvalue.set(cols[1]);
context.write(outkey,outvalue);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=new Job(conf,"dsddd");
job.setJarByClass(stationRedJoin.class);
job.setMapOutputKeyClass(IntWritable.class);
//job.setMaxMapAttempts(1);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, customerMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, countryMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true)?1:0);
}
}
【问题讨论】: