【问题标题】:Custom RecordReader initialize not called未调用自定义 RecordReader 初始化
【发布时间】:2014-03-18 07:40:27
【问题描述】:

我最近开始使用 Hadoop,刚刚创建了自己的输入格式来处理 pdf。

由于某种原因,我的自定义 RecordReader 类没有调用它的初始化方法。 (用sysout检查了一下,因为我没有设置调试环境)

我在 Windows 7 32 位上运行 hadoop 2.2.0。用 yarn jar 打电话,因为 hadoop jar 在 windows 下被窃听...

import ...

public class PDFInputFormat extends FileInputFormat<Text, Text>
{


        @Override
        public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
                JobConf arg1, Reporter arg2) throws IOException 
                {
                    return new PDFRecordReader();
                }

        public static class PDFRecordReader implements RecordReader<Text, Text>
        {

            private FSDataInputStream fileIn;
            public String fileName=null;
            HashSet<String> hset=new HashSet<String>();

            private Text key=null;
            private Text value=null;

            private byte[] output=null;
            private int position = 0;

            @Override
            public Text createValue() {
                int endpos = -1;
                for (int i = position; i < output.length; i++){
                    if (output[i] == (byte) '\n') {
                        endpos = i;
                    }
                }
                if (endpos == -1) {
                    return new Text(Arrays.copyOfRange(output,position,output.length));
                }
                return new Text(Arrays.copyOfRange(output,position,endpos));
            }

            @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
            IOException, InterruptedException
            {
                System.out.println("initialization is called");
                FileSplit split=(FileSplit) genericSplit;
                Configuration conf=job.getConfiguration();

                Path file=split.getPath();
                FileSystem fs=file.getFileSystem(conf);
                fileIn= fs.open(split.getPath());

                fileName=split.getPath().getName().toString();

                System.out.println(fileIn.toString());

                PDDocument docum = PDDocument.load(fileIn);

                ByteArrayOutputStream boss = new ByteArrayOutputStream();
                OutputStreamWriter ow = new OutputStreamWriter(boss);

                PDFTextStripper stripper=new PDFTextStripper();
                stripper.writeText(docum, ow);
                ow.flush();

                output = boss.toByteArray();

            }
        }


}

【问题讨论】:

    标签: java hadoop mapreduce recordreader


    【解决方案1】:

    我昨晚想通了,我可能会帮助其他人:

    RecordReader 是一个废弃的 Hadoop 接口 (hadoop.common.mapred),它实际上不包含初始化方法,这就解释了为什么它不会被自动调用。

    在 hadoop.common.mapreduce 中扩展 RecordReader 类确实可以扩展该类的初始化方法。

    【讨论】:

      【解决方案2】:

      System.out.println() 在运行作业时可能无济于事。为了确保您的initialize() 被调用或不尝试在下面抛出一些RuntimeException

       @Override
                  public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
                  IOException, InterruptedException
                  {
                     throw new NullPointerException("inside initialize()");
                     ....
      

      这肯定行。

      【讨论】:

      • 很公平,虽然我确实在同一类的其他方法中使用了 sysout,所以我希望它也能正常工作。
      猜你喜欢
      • 1970-01-01
      • 2011-08-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-09
      • 1970-01-01
      相关资源
      最近更新 更多