当你将reducer的数量设置为大于1时,Partitioner总是工作,即使它是一个单节点集群。
我已经在单节点集群上测试了以下代码,它按预期工作:
public final class SortMapReduce extends Configured implements Tool {
public static void main(final String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SortMapReduce(), args);
System.exit(res);
}
public int run(final String[] args) throws Exception {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Configuration conf = super.getConf();
Job job = Job.getInstance(conf);
job.setJarByClass(SortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setNumReduceTasks(5);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static class Map extends Mapper<Text, Text, Person, Text> {
private Person outputKey = new Person();
@Override
protected void map(Text pointID, Text firstName, Context context) throws IOException, InterruptedException {
outputKey.set(pointID.toString(), firstName.toString());
context.write(outputKey, firstName);
}
}
public static class Reduce extends Reducer<Person, Text, Text, Text> {
Text pointID = new Text();
@Override
public void reduce(Person key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
pointID.set(key.getpointID());
for (Text firstName : values) {
context.write(pointID, firstName);
}
}
}
}
分区类:
public class PersonNamePartitioner extends Partitioner<Person, Text> {
@Override
public int getPartition(Person key, Text value, int numPartitions) {
return Math.abs(key.getpointID().hashCode() * 127) % numPartitions;
}
}
运行命令:
hadoop jar /home/hdfs/SecondarySort.jar org.test.SortMapReduce /demo/data/Customer/acct.txt /demo/data/Customer/output2
谢谢,