【问题标题】:Mapreduce java heap space error during reduce phase在reduce阶段Mapreduce java堆空间错误
【发布时间】:2015-04-10 15:59:36
【问题描述】:

我有一个简单的 mapreduce 作业来构建 tfidf 索引,但是当 reducer 大约为70%。我尝试了不同的方法,使用各种结构,告诉我的工作在命令中使用更多内存并在更小的样本上运行我的工作,但没有任何改变。我的想法已经结束,所以我将不胜感激任何提示。

Mapper 产生正确的输出,但由于 java 堆空间错误,reducer 总是失败。

这是我正在运行的命令(我正在尝试指定使用的内存量):hadoop jar WordCountMPv1.jar -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --input /user/myslima3/wiki2 --output /user/myslima3/index

我的整个 mapreduce 代码:

public class Indexer extends Configured implements Tool {


    /*
     * Vocabulary: key = term, value = index
     */
    private static Map<String, Integer> vocab = new HashMap<String, Integer>();
    private static Map<String, Double> mapIDF = new HashMap<String, Double>();
    private static final int DOC_COUNT = 751300; // total number of documents

    public static void main(String[] arguments) throws Exception {
        System.exit(ToolRunner.run(new Indexer(), arguments));
    }

    public static class Comparator extends WritableComparator {
        protected Comparator() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            return -a.compareTo(b);
        }
    }

    public static class IndexerMapper extends
            Mapper<Object, Text, IntWritable, Text> {
        private Text result = new Text();

        // load vocab from distributed cache
        public void setup(Context context) throws IOException {
            Configuration conf = context.getConfiguration();
            FileSystem fs = FileSystem.get(conf);
            URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
            Path getPath = new Path(cacheFiles[0].getPath());

            BufferedReader bf = new BufferedReader(new InputStreamReader(
                    fs.open(getPath)));
            String line = null;
            while ((line = bf.readLine()) != null) {
                StringTokenizer st = new StringTokenizer(line, " \t");

                int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id
                String word = st.nextToken(); // second element is the term
                double IDF = Integer.parseInt(st.nextToken()); // third token is the DF

                // compute IDF
                IDF = (Math.log(DOC_COUNT / IDF));
                mapIDF.put(word, IDF);

                // save vocab
                vocab.put(word, index);

            }
        }

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            // init TF map
            Map<String, Integer> mapTF = new HashMap<String, Integer>();

            // parse input string
            StringTokenizer st = new StringTokenizer(value.toString(), " \t");

            // first element is doc index
            int index = Integer.parseInt(st.nextToken());
            //sb.append(index + "\t");

            // count term frequencies
            String word;
            while (st.hasMoreTokens()) {
                word = st.nextToken();

                // check if word is in the vocabulary
                if (vocab.containsKey(word)) {
                    if (mapTF.containsKey(word)) {
                        int count = mapTF.get(word);
                        mapTF.put(word, count + 1);
                    } else {
                        mapTF.put(word, 1);
                    }
                }
            }

            // compute TF-IDF
            double idf;
            double tfidf;
            int wordIndex;
            for (String term : mapTF.keySet()) {
                int tf = mapTF.get(term);

                if (mapIDF.containsKey(term)) {
                    idf = mapIDF.get(term);

                    tfidf = tf * idf;
                    wordIndex = vocab.get(term);

                    context.write(new IntWritable(wordIndex), new Text(index + ":" + tfidf));
                }

            }               
        }
    }

    public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text>
    {
        @Override
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {

            // reset vocab and maps to reduce memory
            vocab = null;
            mapIDF = null;

            StringBuilder sb = new StringBuilder();

            for (Text value : values)
            {
                sb.append(value.toString() + " ");
            }

            context.write(key, new Text(sb.toString()));
        }
    }

    @Override
    public int run(String[] arguments) throws Exception {
        ArgumentParser parser = new ArgumentParser("TextPreprocessor");

        parser.addArgument("input", true, true, "specify input directory");
        parser.addArgument("output", true, true, "specify output directory");

        parser.parseAndCheck(arguments);

        Path inputPath = new Path(parser.getString("input"));
        Path outputDir = new Path(parser.getString("output"));

        // Create configuration.
        Configuration conf = getConf();

        // add distributed file with vocabulary
        DistributedCache
                .addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);

        // Create job.
        Job job = new Job(conf, "WordCount");
        job.setJarByClass(IndexerMapper.class);

        // Setup MapReduce.
        job.setMapperClass(IndexerMapper.class);
        job.setReducerClass(IndexerReducer.class);

        // Sort the output words in reversed order.
        job.setSortComparatorClass(Comparator.class);


        job.setNumReduceTasks(1);

        // Specify (key, value).
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Input.
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output.
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileSystem hdfs = FileSystem.get(conf);

        // Delete output directory (if exists).
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute the job.
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

感谢您的帮助!

编辑:堆栈跟踪

15/04/06 10:54:38 INFO mapreduce.Job:  map 0% reduce 0%
15/04/06 10:54:52 INFO mapreduce.Job:  map 25% reduce 0%
15/04/06 10:54:54 INFO mapreduce.Job:  map 31% reduce 0%
15/04/06 10:54:55 INFO mapreduce.Job:  map 50% reduce 0%
15/04/06 10:54:56 INFO mapreduce.Job:  map 55% reduce 0%
15/04/06 10:54:58 INFO mapreduce.Job:  map 58% reduce 0%
15/04/06 10:55:00 INFO mapreduce.Job:  map 63% reduce 0%
15/04/06 10:55:07 INFO mapreduce.Job:  map 69% reduce 0%
15/04/06 10:55:08 INFO mapreduce.Job:  map 82% reduce 0%
15/04/06 10:55:10 INFO mapreduce.Job:  map 88% reduce 0%
15/04/06 10:55:11 INFO mapreduce.Job:  map 96% reduce 0%
15/04/06 10:55:12 INFO mapreduce.Job:  map 100% reduce 0%
15/04/06 10:55:25 INFO mapreduce.Job:  map 100% reduce 29%
15/04/06 10:55:31 INFO mapreduce.Job:  map 100% reduce 36%
15/04/06 10:55:34 INFO mapreduce.Job:  map 100% reduce 48%
15/04/06 10:55:37 INFO mapreduce.Job:  map 100% reduce 61%
15/04/06 10:55:40 INFO mapreduce.Job:  map 100% reduce 68%
15/04/06 10:55:43 INFO mapreduce.Job:  map 100% reduce 71%
15/04/06 10:55:44 INFO mapreduce.Job: Task Id : attempt_1427101801879_0658_r_000000_0, Status : FAILED
Error: Java heap space

【问题讨论】:

  • 你能更新数据的大小和错误信息吗
  • 至于数据 - 我在几个文本文件中尝试了 1GB 和 300MB 的数据。错误消息说只是 FAIL: Java heap space when reducer get to 70& (在两个数据样本上...)
  • 您的集群规模如何?你为此付出了多少内存?
  • 我不确定。我没有关于集群的详细信息。但是还有一些其他 MapReduce 作业处理的数据远比这运行起来没有任何问题......
  • 并行运行??

标签: java hadoop mapreduce


【解决方案1】:

更仔细地查看附加在 reducer 中的 StringBuffer。您没有指定(我认为)默认为 16 的初始大小。随着它的增长,它需要将自身复制到一个越来越大的缓冲区中,因此最终的缓冲区长度为 16、32、48、64,... (不确定增长量,但你明白了)。无论如何,传入reducer 的大量值会导致使用大量内存,而垃圾收集可以处理大部分内存,直到StringBuffer 变得如此之大以至于无法增长。换句话说,这并不能很好地扩展。

鉴于这是您选择的算法,但是,我只能建议您尝试提供一个非常大的初始大小,看看您是否能幸运并强制增长恰好适合可用内存。

如果做不到这一点,您也许可以创建一个特殊的 OutputFormat,它能够在写入值时连接它们,并在键更改时创建一个新行,但我一直没有想到这一点。

【讨论】:

  • 感谢您的提示。我尝试使用一些初始容量(new StringBuilder(4096) 或更高)初始化 StringBuilder,但它没有解决内存问题
  • 是的,我认为问题在于您试图连接大量不适合的值。也许为每 1000 (?) 个值写一行,然后在 map reduce 之后遍历文件以合并多行条目?
  • 但是我从映射器传递的每个值都属于减速器中的另一行,我不能在映射阶段将它们放在一起。他们不属于一起......
【解决方案2】:

解决了我指定更多数量的减速器并实现组合器的问题。

【讨论】:

猜你喜欢
  • 2017-06-30
  • 2016-06-15
  • 2023-04-07
  • 1970-01-01
  • 2011-10-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多