【问题标题】:Sort data Hadoop Mapreduce排序数据 Hadoop Mapreduce
【发布时间】:2015-12-16 06:58:07
【问题描述】:

我有以下算法按字母顺序对数据进行排序

public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("amasort.case.sensitive", true);

    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
      word.set(line+"_"+key.toString());
      context.write(word, one);
      System.out.println("key:"+key.toString()+";value:"+value.toString());
      }
    }

  public static class ForwardReducer
       extends Reducer<Text,NullWritable,Text,NullWritable> {
    private NullWritable result = NullWritable.get();

    public void reduce(Text key, Iterable<NullWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {

      String originalWord = key.toString();
      originalWord = originalWord.substring(0, originalWord.lastIndexOf("_"));
      key.set(originalWord);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    Job job = Job.getInstance(conf, "word sort");
    job.setJarByClass(AmaSort.class);
    job.setMapperClass(LineMapper.class);
//    job.setCombinerClass(ForwardReducer.class);
    job.setReducerClass(ForwardReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

我尝试使用此算法对包含 (@xxxxxxx, 0,tcp,xx,1,1,1,2,4,5,....) 的 mydata 集进行排序,但输出所有以 @ 开头的行都被删除和数据行结构 0,tcp,x1x1,1,114,.... 被修改,我只想用这个特定的字符 (@) 对我的数据集进行排序,所有行在文件的第一个以 @ 开头,其余的保持相同的结构。 任何人都可以帮我修改这个算法吗?

【问题讨论】:

    标签: sorting hadoop mapreduce dataset


    【解决方案1】:

    您可以使用以下修改后的代码进行排序,

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class AmaSort
    {
        static Configuration conf = null;
        private static boolean caseSensitive;
        private static Text word = new Text();
    
        public static class LineMapper extends Mapper<Object, Text, Text, NullWritable>{
    
            public void setup(Context context) throws IOException, InterruptedException
            {
                conf = context.getConfiguration();
                caseSensitive = conf.getBoolean("amasort.case.sensitive", true);
    
            }
    
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException
            {
                String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
                word.set(line);
                context.write(word, NullWritable.get());
    
            }
        }
    
        public static class ForwardReducer extends Reducer<Text, NullWritable, Text, NullWritable>
        {
            private NullWritable result = NullWritable.get();
    
            public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
            {
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception
        {
        Configuration conf = new Configuration();
        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();
    //  Job job = Job.getInstance(conf, "word sort");
        Job job = new Job(conf, "word sort");
        job.setJarByClass(AmaSort.class);
        job.setMapperClass(LineMapper.class);
        // job.setCombinerClass(ForwardReducer.class);
        job.setReducerClass(ForwardReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
    
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
    
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
    }
    

    【讨论】:

    • 谢谢@prashant 它有效但是有很多数据被删除了(我的原始数据集大小为 36MB,排序后的数据只有 3.6Mb),我在文件 0、tcp、xxxxx 的第一个中获得了数据。 0,1,111 然后是 'At' 属性....最后一个 'At' 数据我想要按这个顺序 'At'attribute 然后一行 'At' 日期最后剩下的数据 你能帮我解决这个问题吗?或者如果你想给我发你的电子邮件来互相联系
    • 当我们使用键进行排序时,默认情况下它将使用RowComparator对键进行排序,但如果您想使用自定义排序对它们进行排序,您可以实现自己的SortComparator请有看看链接,stackoverflow.com/questions/16184745/… 顺便说一下,你可以通过 prashant.n.khunt@gmail.com 联系我
    • 您能否也举一些从原始数据中删除的行的示例?并且大小可能会减少,因为从数据中删除了重复项。
    • 谢谢你的兄弟,我现在给你发邮件,希望我能尽快收到你的回放
    猜你喜欢
    • 2011-02-02
    • 2015-10-05
    • 2011-04-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多