【问题标题】:XML Parsing through HADOOP通过 HADOOP 解析 XML
【发布时间】:2015-10-26 19:25:17
【问题描述】:

我有大量需要分析的 WIKI 数据。这些转储基本上是 XML 文件。如何使用 HADOOP Map Reduce 解析 XML 文件?

【问题讨论】:

  • 你到底想做什么?解析然后对解析后的结果做什么?
  • 需要提取 XML 文件中存在的标签值。例如:- XML 文件是:-dfs.replication1dfs2 我需要获取属性标签中存在的标签的值。
  • 还有,为什么要使用 Hadoop 而不是 XStream 之类的东西?
  • @RanjanPradhan 请接受下面给出的答案。
  • 我又想到了一件事,当文件超过块大小时会发生什么?一个xml可以分成两个不同的块,如何处理?

标签: hadoop


【解决方案1】:
import java.io.ByteArrayInputStream;
import java.io.IOException;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class XmlDriver
{

 public static class XmlInputFormat1 extends TextInputFormat {

    public static final String START_TAG_KEY = "xmlinput.start";
    public static final String END_TAG_KEY = "xmlinput.end";


    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        return new XmlRecordReader();
    }

    /**
     * XMLRecordReader class to read through a given xml document to output
     * xml blocks as records as specified by the start tag and end tag
     *
     */

    public static class XmlRecordReader extends
    RecordReader<LongWritable, Text> {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();

        private LongWritable key = new LongWritable();
        private Text value = new Text();
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
            endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
            FileSplit fileSplit = (FileSplit) split;

            // open the file and seek to the start of the split
            start = fileSplit.getStart();
            end = start + fileSplit.getLength();
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            fsin = fs.open(fileSplit.getPath());
            fsin.seek(start);

        }
        @Override
        public boolean nextKeyValue() throws IOException,
        InterruptedException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        buffer.write(startTag);
                        if (readUntilMatch(endTag, true)) {
                            key.set(fsin.getPos());
                            value.set(buffer.getData(), 0,
                                    buffer.getLength());
                            return true;
                        }
                    } finally {
                        buffer.reset();
                    }
                }
            }
            return false;
        }
        @Override
        public LongWritable getCurrentKey() throws IOException,
        InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException,
        InterruptedException {
            return value;
        }
        @Override
        public void close() throws IOException {
            fsin.close();
        }
        @Override
        public float getProgress() throws IOException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        private boolean readUntilMatch(byte[] match, boolean withinBlock)
        throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();
                // end of file:
                    if (b == -1)
                        return false;
                // save to buffer:
                    if (withinBlock)
                        buffer.write(b);
                // check if we're matching:
                    if (b == match[i]) {
                        i++;
                        if (i >= match.length)
                            return true;
                    } else
                        i = 0;
                    // see if we've passed the stop point:
                    if (!withinBlock && i == 0 && fsin.getPos() >= end)
                        return false;
            }
        }
    }
}


public static class Map extends Mapper<LongWritable, Text,
Text, Text> {
    @Override
    protected void map(LongWritable key, Text value,
            Mapper.Context context)
    throws
    IOException, InterruptedException {
        String document = value.toString();
        System.out.println("‘" + document + "‘");
        try {
            XMLStreamReader reader =
                XMLInputFactory.newInstance().createXMLStreamReader(new
                        ByteArrayInputStream(document.getBytes()));
            String propertyName = "";
            String propertyValue = "";
            String currentElement = "";
            while (reader.hasNext()) {
                int code = reader.next();
                switch (code) {
                case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:
                    currentElement = reader.getLocalName();
                    break;
                case XMLStreamConstants.CHARACTERS: //CHARACTERS:
                    if (currentElement.equalsIgnoreCase("name")) {
                        propertyName += reader.getText();
                        System.out.println("propertName"+propertyName);
                    } else if (currentElement.equalsIgnoreCase("value")) {
                        propertyValue += reader.getText();
                        System.out.println("propertyValue"+propertyValue);
                    }
                    break;
                }
            }
            reader.close();
            context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));

        }
        catch(Exception e){
            throw new IOException(e);

        }

    }
}
public static class Reduce
extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void setup(
            Context context)
    throws IOException, InterruptedException {
        context.write(new Text("<configuration>"), null);
    }

    @Override
    protected void cleanup(
            Context context)
    throws IOException, InterruptedException {
        context.write(new Text("</configuration>"), null);
    }

    private Text outputKey = new Text();
    public void reduce(Text key, Iterable<Text> values,
            Context context)
    throws IOException, InterruptedException {
        for (Text value : values) {
            outputKey.set(constructPropertyXml(key, value));

            context.write(outputKey, null);
        }
    }

    public static String constructPropertyXml(Text name, Text value) {
        StringBuilder sb = new StringBuilder();
        sb.append("<property><name>").append(name)
        .append("</name><value>").append(value)
        .append("</value></property>");
        return sb.toString();
    }
}



public static void main(String[] args) throws Exception
{
    Configuration conf = new Configuration();

    conf.set("xmlinput.start", "<property>");
    conf.set("xmlinput.end", "</property>");
    Job job = new Job(conf);
    job.setJarByClass(XmlDriver.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(XmlDriver.Map.class);
    job.setReducerClass(XmlDriver.Reduce.class);

    job.setInputFormatClass(XmlInputFormat1.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
  }
}

参数:- /home/test.xml /home/out

Xml 文件(test.xml):

<configuration>
<property>
        <name>dfs.replication</name>
        <value>1</value>
 </property>
<property>
    <name>dfs</name>
    <value>2</value>
</property>
</configuration>

【讨论】:

  • 我在运行程序时遇到此错误.. java.lang.Exception: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext,但在 org. org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 上的 apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 原因:java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext,但在 xml.XmlRecordReader.initialize(XmlRecordReader.java:37) 中需要类
  • 我的问题在这里讨论和回答stackoverflow.com/questions/14354309/…
  • 上述代码如何处理单个记录的内容跨越2个拆分的文件拆分场景?
猜你喜欢
  • 2023-03-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-02-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多