【问题标题】:How to parse PDF files in map reduce programs?如何在 map reduce 程序中解析 PDF 文件?
【发布时间】:2013-12-24 10:09:46
【问题描述】:

我想在我的 hadoop 2.2.0 程序中解析 PDF 文件,我找到了this,按照它所说的,直到现在,我有这三个类:

  1. PDFWordCount: 包含 map 和 reduce 函数的主类。 (就像native hadoop wordcount 示例,但我使用了PDFInputFormat 类而不是TextInputFormat
  2. PDFRecordReader extends RecordReader<LongWritable, Text>:这是这里的主要工作。特别是我把我的initialize 函数放在这里以获得更多说明。

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
          throws IOException, InterruptedException {
          System.out.println("initialize");
          System.out.println(genericSplit.toString());
        FileSplit split = (FileSplit) genericSplit;
        System.out.println("filesplit convertion has been done");
        final Path file = split.getPath();
        Configuration conf = context.getConfiguration();
        conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        FileSystem fs = file.getFileSystem(conf);
        System.out.println("fs has been opened");
        start = split.getStart();
        end = start + split.getLength();
        System.out.println("going to open split");
        FSDataInputStream filein = fs.open(split.getPath());
        System.out.println("going to load pdf");
        PDDocument pd = PDDocument.load(filein);
        System.out.println("pdf has been loaded");
        PDFTextStripper stripper = new PDFTextStripper();
        in =
        new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes(
            "UTF-8")));
        start = 0;
        this.pos = start;
        System.out.println("init has finished");
      }
    

    (您可以查看我的system.out.printlns 进行调试。 此方法无法将genericSplit 转换为FileSplit。我在控制台中看到的最后一件事是:

    hdfs://localhost:9000/in:0+9396432
    

    这是genericSplit.toString()

  3. PDFInputFormat extends FileInputFormat<LongWritable, Text>: 只是在createRecordReader 方法中创建new PDFRecordReader

我想知道我的错误是什么?

我需要额外的课程吗?

【问题讨论】:

  • 你没有一些日志吗?请添加 Stacktrace。
  • 不,没有Exception。它只是在那里终止。
  • 这不太可能,甚至在您的任务日志中也没有?;-)
  • 好的,要么尝试添加像System.out.println(genericSplit.getClass()) 这样的打印语句,要么挂在调试器中。您是否将作业提交到正在运行的集群?如果是,那么一定有一些日志。
  • 我不知道有什么方法可以调试 map reduce 程序。如果您知道,请通过this question 帮助我。我在 localhost 上有一个 hadoop 单节点设置,并在其上运行测试。

标签: java pdf hadoop hadoop-yarn


【解决方案1】:

阅读 PDF 并不难,您需要扩展类 FileInputFormat 以及 RecordReader。 FileInputClass 不应该能够拆分 PDF 文件,因为它们是二进制文件。

public class PDFInputFormat extends FileInputFormat<Text, Text> {

  @Override
  public RecordReader<Text, Text> createRecordReader(InputSplit split,
    TaskAttemptContext context) throws IOException, InterruptedException {
      return new PDFLineRecordReader();
  }

  // Do not allow to ever split PDF files, even if larger than HDFS block size
  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
    return false;
  }

}

RecordReader 然后自行执行阅读(我正在使用 PDFBox 阅读 PDF)。

public class PDFLineRecordReader extends RecordReader<Text, Text> {

private Text key = new Text();
private Text value = new Text();
private int currentLine = 0;
private List<String> lines = null;

private PDDocument doc = null;
private PDFTextStripper textStripper = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {

    FileSplit fileSplit = (FileSplit) split;
    final Path file = fileSplit.getPath();

    Configuration conf = context.getConfiguration();
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream filein = fs.open(fileSplit.getPath());

    if (filein != null) {

        doc = PDDocument.load(filein);

        // Konnte das PDF gelesen werden?
        if (doc != null) {
            textStripper = new PDFTextStripper();
            String text = textStripper.getText(doc);

            lines = Arrays.asList(text.split(System.lineSeparator()));
            currentLine = 0;

        }

    }
}

    // False ends the reading process
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

    if (key == null) {
        key = new Text();
    }

    if (value == null) {
        value = new Text();
    }

    if (currentLine < lines.size()) {
        String line = lines.get(currentLine);

        key.set(line);

        value.set("");
        currentLine++;

        return true;
    } else {

        // All lines are read? -> end
        key = null;
        value = null;
        return false;
    }
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
    return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return (100.0f / lines.size() * currentLine) / 100.0f;
}

@Override
public void close() throws IOException {

    // If done close the doc
    if (doc != null) {
        doc.close();
    }

}

希望这会有所帮助!

【讨论】:

    【解决方案2】:
    package com.sidd.hadoop.practice.pdf;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.sidd.hadoop.practice.input.pdf.PdfFileInputFormat;
    import com.sidd.hadoop.practice.output.pdf.PdfFileOutputFormat;
    
    public class ReadPdfFile {
    
        public static class MyMapper extends
                Mapper<LongWritable, Text, LongWritable, Text> {
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
    //          context.progress();
                context.write(key, value);
            }
        }
    
        public static class MyReducer extends
                Reducer<LongWritable, Text, LongWritable, Text> {
            public void reduce(LongWritable key, Iterable<Text> values,
                               Context context) throws IOException, InterruptedException {
                if (values.iterator().hasNext()) {
                    context.write(key, values.iterator().next());
                } else {
                    context.write(key, new Text(""));
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            Job job = new Job(conf, "Read Pdf");
            job.setJarByClass(ReadPdfFile.class);
    
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
    
            job.setInputFormatClass(PdfFileInputFormat.class);
            job.setOutputFormatClass(PdfFileOutputFormat.class);
    
            removeDir(args[1], conf);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
        public static void removeDir(String path, Configuration conf) throws IOException {
            Path output_path = new Path(path);
            FileSystem fs = FileSystem.get(conf);
    
            if (fs.exists(output_path)) {
                fs.delete(output_path, true);
            }
        }   
    
    }
    

    【讨论】:

    • 虽然您的代码可能是问题的答案,但最好提供有关它的更多信息,例如您选择这种方法的原因等。
    猜你喜欢
    • 2012-03-14
    • 1970-01-01
    • 1970-01-01
    • 2011-07-21
    • 2016-01-01
    • 2013-09-15
    • 2023-04-03
    • 1970-01-01
    • 2014-04-12
    相关资源
    最近更新 更多