【问题标题】:Hadoop MapReduce sort reduce output using the keyHadoop MapReduce 使用键排序减少输出
【发布时间】:2013-06-16 14:30:52
【问题描述】:

下面有一个 map-reduce 程序计算几个文本文件的单词。 我的目标是按照出现次数的降序排列结果。

不幸的是,该程序按关键字按字典顺序对输出进行排序。我想要整数值的自然顺序。

所以我添加了一个带有job.setSortComparatorClass(IntComparator.class) 的自定义比较器。但这并没有按预期工作。我收到以下异常:

java.lang.Exception: java.nio.BufferUnderflowException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355)
    at WordCount$IntComparator.compare(WordCount.java:128)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:987)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:100)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:64)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1277)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1174)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:609)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:675)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

任何帮助将不胜感激! :)

我在下面列出了整个程序,因为可能有一个我显然不知道的异常原因。如您所见,我正在使用新的 mapreduce api (org.apache.hadoop.mapreduce.*)。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Counts the words in several text files.
 */
public class WordCount {
  /**
   * Maps lines of text to (word, amount) pairs.
   */
  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable amount = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String textLine = value.toString();

      StringTokenizer tokenizer = new StringTokenizer(textLine);
      while (tokenizer.hasMoreElements()) {
        word.set((String) tokenizer.nextElement());

        context.write(word, amount);
      }
    }

  }

  /**
   * Reduces (word, amount) pairs to (amount, word) list.
   */
  public static class Reduce extends
      Reducer<Text, IntWritable, IntWritable, Text> {

    private IntWritable amount = new IntWritable();
    private int sum;

    @Override
    protected void reduce(Text key, Iterable<IntWritable> valueList,
        Context context) throws IOException, InterruptedException {
      sum = 0;

      for (IntWritable value : valueList) {
        sum += value.get();
      }

      amount.set(sum);
      context.write(amount, key);
    }
  }

  public static class IntComparator extends WritableComparator {
    public IntComparator() {
      super(IntWritable.class);
    }

    private Integer int1;
    private Integer int2;

    @Override
    public int compare(byte[] raw1, int offset1, int length1, byte[] raw2,
        int offset2, int length2) {
      int1 = ByteBuffer.wrap(raw1, offset1, length1).getInt();
      int2 = ByteBuffer.wrap(raw2, offset2, length2).getInt();

      return int2.compareTo(int1);
    }

  }

  /**
   * Job configuration.
   * 
   * @param args
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   */
  public static void main(String[] args) throws IOException,
      ClassNotFoundException, InterruptedException {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration configuration = new Configuration();
    configuration.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
    Job job = new Job(configuration);
    job.setJobName("WordCount");
    job.setJarByClass(WordCount.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setSortComparatorClass(IntComparator.class);

    FileInputFormat.setInputPaths(job, inputPath);

    FileSystem.get(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    job.waitForCompletion(true);
  }
}

【问题讨论】:

  • 您介意解释一下字符串的natural order of the integer value 是什么吗?目前您正在尝试比较字符串中的前 4 个字节(wtf?)。
  • 不是字符串,IntWritable,这是关键。
  • 但是你的键是文本,没有 int 可以在任何地方排序。
  • reducer 的输出使用 IntWritable 作为 key 类型。请参阅context.write(amount, key);,其中数量是 IntWritable。
  • reducer 之后不会发生排序。

标签: java sorting hadoop mapreduce comparator


【解决方案1】:

比较器步骤发生在MapperReducer 之间,当您在Reducer 本身中交换键和值时,这对您不起作用。

如果键是 IntWritable,默认的 WritableComparator 通常会处理您的数字排序,除非它得到一个 Text 键,从而导致字典顺序。

至于为什么最后的输出没有按您写出的IntWritable 键排序,我不确定。也许这与TextOutputFormat 的工作方式有关?您可能需要深入研究 TextOutputFormat 源代码以获取相关线索,但简而言之,恐怕在这里设置排序比较器可能对您没有帮助。

【讨论】:

  • 也就是说实际上应该使用IntWritable的比较功能进行排序?
【解决方案2】:

正如quetzalcoatl 所说,您的比较器没有用,因为它在 Map 和 reduce 阶段之间使用,而不是在 Reduce 阶段之后使用。因此,要完成此操作,您需要对 cleanupReducer 进行排序,或者编写另一个程序来对减速器的输出进行排序。

【讨论】:

    【解决方案3】:

    基本上,您需要按值排序。有两种方法可以实现这一点。但简而言之,您需要 2 个 map-reduce,即在第一个 Map reduce 的输出上再运行一个 map reduce。

    在完成 normal map-reduce 后,再做一个 map reduce,您将第一个 map reduce 的输出作为第二个 map reduce 的输入。在第二个 map reduce 的 map 阶段,您可以使用自定义类作为键,例如 class WordCountVo implements WritableComparable&lt;WordCountVo&gt; 你必须覆盖 public int compareTo(WordCountVo wodCountVo) 方法。 在 WordCountVO 中,您可以同时保留字数和计数,但仅根据计数进行比较。例如。下面是 WordCountVO 的成员变量

    private String word;
    private Long count;
    

    现在,当您在第二个 reducer 中收到键值对时,您的数据将全部按值排序。您需要做的就是使用上下文编写键值对!希望这可以帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-02-02
      • 1970-01-01
      • 2015-10-05
      • 1970-01-01
      • 1970-01-01
      • 2014-03-31
      • 1970-01-01
      相关资源
      最近更新 更多