【问题标题】:file comparison in hadoop using mapreduce使用mapreduce在hadoop中进行文件比较
【发布时间】:2017-01-19 16:01:20
【问题描述】:

一个.txt

col1       col2    col3    col4    col5
A           120     140     160     180
B           200     220     240     260
D           400     420     440     460

B.txt

col1      col2    col3    col4    col5
A          110     140     160     180
B          200     220     240     260
C          600     620     640     660

输出文件

A          120     140     160     180
A          110     140     160     180
B          200     220     240     260
D          400     420     440     460
C          600     620     640     660

1) col1 和 col2 是主键,其中任何键被更改,然后我们显示两条记录,如

in A.txt contain 1st Records:- A          120      140     160     180
in B.txt contain 1st Records:- A          110      140     160     180 

在这个 col2 中发生了变化,所以我必须显示两条记录

2) 如果两个文件的记录没有变化(我的意思是看起来一样),我们只需要显示一个记录

3) 在两个文件中显示所有其他记录

最终输出应该是这样的

输出文件

A          120     140     160     180
A          110     140     160     180
B          200     220     240     260
D          400     420     440     460
C          600     620     640     660

【问题讨论】:

    标签: hadoop mapreduce reduce


    【解决方案1】:

    使用 PIG。加载两个文件,合并记录,然后区分它。

    A = LOAD 'A.txt' USING PigStorage('\t');
    B = LOAD 'B.txt' USING PigStorage('\t');
    C = UNION A,B;
    D = DISTINCT C;
    DUMP D;
    

    【讨论】:

    • 问的问题是mapreduce,你用PIG回答?
    • 这怎么可能是错的! OP 没有提供任何代码来决定语言。
    • 我同意这正是 PIG 的用途,因此在这种情况下应该使用它(所以我 +1)。但是,我想知道为什么 OP 提到了主键。也许这意味着它只关心前两列的相等性,所以“distinct”并不完全正确。不过不确定...
    【解决方案2】:

    这里是mapreduce 解决方案:
    将 2 个或多个文件放在一个目录中(输入 - arg1),它会将所有文件与一个符合您所有要求的文件合并。它还匹配 col3 以结束一个键(col1+col2)的非 macthing 行,请参阅 cmets 了解更多信息...

    public class FileCompare  extends Configured implements Tool{
    
        public static class FileComapreMapper extends Mapper<Object, Text, Text, Text> {
            int lineno=0;
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
                try{
                    lineno++;
                    System.out.println(lineno + " -> " + value);
                    //skip header - uncomment this line to include header in output
                    if(lineno == 1) return; 
    
                    String[] fields = value.toString().split("\\s+");//assuming input recs are whitespace seperated
                    String col1_col2 = fields[0] + "," + fields[1]; //key
                    String col3tolast="";
                    for(int i=2; i < fields.length;i++)
                        col3tolast+=fields[i] + ","; //values
    
                   col3tolast=col3tolast.substring(0, col3tolast.length()-1); //remove last char(',')
                   context.write(new Text(col1_col2), new Text(col3tolast)); //send key, value pairs to reducer
                }catch(Exception e){
                    System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
                }
            }   
        }
    
        public  static class FileComapreReducer extends Reducer<Text, Text, Text, Text> {
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) 
                    throws IOException, InterruptedException {
                //Get unique col3 to last value
                Set<Text> uniqueCol3tolast = new HashSet<Text>();
                for(Text record : values)
                    uniqueCol3tolast.add(record);
                //write key + value
                for(Text col3tolast:uniqueCol3tolast) //outputing tab delimited recs
                    context.write(new Text(key.toString().replaceAll(",", "\t")), 
                            new Text(col3tolast.toString().replaceAll(",", "\t")));     
            }
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new FileCompare(), args);
            System.exit(res);
        }
    
        public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.println("Usage: <in> <out>");
                System.exit(2);
            }
            Configuration conf = this.getConf();
            Job job = Job.getInstance(conf, "merge-two-files");
            job.setJarByClass(FileCompare.class);
            job.setMapperClass(FileComapreMapper.class);
            job.setReducerClass(FileComapreReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            FileSystem fs = null;
            Path dstFilePath = new Path(args[1]);
            try {
                fs = dstFilePath.getFileSystem(conf);
                if (fs.exists(dstFilePath))
                    fs.delete(dstFilePath, true);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            return job.waitForCompletion(true) ? 0 : 1;
        } 
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-03-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多