这里是mapreduce 解决方案:
将 2 个或多个文件放在一个目录中(输入 - arg1),它会将所有文件与一个符合您所有要求的文件合并。它还匹配 col3 以结束一个键(col1+col2)的非 macthing 行,请参阅 cmets 了解更多信息...
public class FileCompare extends Configured implements Tool{
public static class FileComapreMapper extends Mapper<Object, Text, Text, Text> {
int lineno=0;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
try{
lineno++;
System.out.println(lineno + " -> " + value);
//skip header - uncomment this line to include header in output
if(lineno == 1) return;
String[] fields = value.toString().split("\\s+");//assuming input recs are whitespace seperated
String col1_col2 = fields[0] + "," + fields[1]; //key
String col3tolast="";
for(int i=2; i < fields.length;i++)
col3tolast+=fields[i] + ","; //values
col3tolast=col3tolast.substring(0, col3tolast.length()-1); //remove last char(',')
context.write(new Text(col1_col2), new Text(col3tolast)); //send key, value pairs to reducer
}catch(Exception e){
System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
}
}
}
public static class FileComapreReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//Get unique col3 to last value
Set<Text> uniqueCol3tolast = new HashSet<Text>();
for(Text record : values)
uniqueCol3tolast.add(record);
//write key + value
for(Text col3tolast:uniqueCol3tolast) //outputing tab delimited recs
context.write(new Text(key.toString().replaceAll(",", "\t")),
new Text(col3tolast.toString().replaceAll(",", "\t")));
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FileCompare(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "merge-two-files");
job.setJarByClass(FileCompare.class);
job.setMapperClass(FileComapreMapper.class);
job.setReducerClass(FileComapreReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem fs = null;
Path dstFilePath = new Path(args[1]);
try {
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
} catch (IOException e1) {
e1.printStackTrace();
}
return job.waitForCompletion(true) ? 0 : 1;
}
}