【问题标题】:Extend SequenceFileInputFormat to include file name+offset扩展 SequenceFileInputFormat 以包含文件名+偏移量
【发布时间】:2013-09-05 17:52:41
【问题描述】:

我希望能够创建一个自定义 InputFormat 来读取序列文件,但另外公开文件路径和该文件中记录所在的偏移量。

退一步说,这里是用例:我有一个包含可变大小数据的序列文件。键大多是不相关的,值最多为几兆字节,包含各种不同的字段。我想在 elasticsearch 中索引其中一些字段以及文件名和偏移量。这样我就可以从elasticsearch中查询到那些字段,然后使用文件名和偏移量回到sequence文件中获取原始记录,而不是把整个东西存入ES。

我将整个过程作为一个单独的 java 程序运行。 SequenceFile.Reader 类方便地提供了getPositionseek 方法来实现这一点。

但是,最终会涉及到许多 TB 的数据,因此我需要将其转换为 MapReduce 作业(可能仅限 Map)。由于序列文件中的实际键无关紧要,我希望采用的方法是创建一个自定义 InputFormat 扩展或以某种方式利用 SquenceFileInputFormat,但不是返回实际键,而是返回由文件组成的复合键和偏移量。

但是,事实证明这在实践中更加困难。看起来应该是可能的,但考虑到实际的 API 和暴露的内容,这很棘手。有任何想法吗?也许我应该采取另一种方法?

【问题讨论】:

    标签: java hadoop mapreduce sequencefile


    【解决方案1】:

    如果有人遇到类似的问题,这是我想出的解决方案。我最终只是简单地复制了 SequenceFileInputFormat/RecordReader 中的一些代码并对其进行了修改。我曾希望写一个子类或装饰器或其他东西......这种方式并不漂亮,但它有效:

    SequenceFileOffsetInputFormat.java:

    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    
    public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> {
    
        private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> {
    
            private SequenceFile.Reader in;
            private long start;
            private long end;
            private boolean more = true;
            private PathOffsetWritable key = null;
            private Writable k = null;
            private V value = null;
            private Configuration conf;
    
            @Override
            public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
                FileSplit fileSplit = (FileSplit) split;
                conf = context.getConfiguration();
                Path path = fileSplit.getPath();
                FileSystem fs = path.getFileSystem(conf);
                this.in = new SequenceFile.Reader(fs, path, conf);
                try {
                    this.k = (Writable) in.getKeyClass().newInstance();
                    this.value = (V) in.getValueClass().newInstance();
                } catch (InstantiationException e) {
                    throw new IOException(e);
                } catch (IllegalAccessException e) {
                    throw new IOException(e);
                }
                this.end = fileSplit.getStart() + fileSplit.getLength();
    
                if (fileSplit.getStart() > in.getPosition()) {
                    in.sync(fileSplit.getStart());
                }
    
                this.start = in.getPosition();
                more = start < end;
    
                key = new PathOffsetWritable(path, start);
            }
    
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                if (!more) {
                    return false;
                }
                long pos = in.getPosition();
    
                more = in.next(k, value);
                if (!more || (pos >= end && in.syncSeen())) {
                    key = null;
                    value = null;
                    more = false;
                } else {
                    key.setOffset(pos);
                }
                return more;
            }
    
            @Override
            public PathOffsetWritable getCurrentKey() {
                return key;
            }
    
            @Override
            public V getCurrentValue() {
                return value;
            }
    
            @Override
            public float getProgress() throws IOException, InterruptedException {
                if (end == start) {
                    return 0.0f;
                } else {
                    return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
                }
            }
    
            @Override
            public void close() throws IOException {
                in.close();
            }
    
        }
    
        @Override
        public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            return new SequenceFileOffsetRecordReader<V>();
        }
    
        @Override
        public List<InputSplit> getSplits(JobContext context) throws IOException {
            return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context);
        }
    
        @Override
        public long getFormatMinSplitSize() {
            return SequenceFile.SYNC_INTERVAL;
        }
    
    
    }
    

    PathOffsetWritable.java:

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> {
    
        private Text t = new Text();
        private Path path;
        private long offset;
    
        public PathOffsetWritable(Path path, long offset) {
            this.path = path;
            this.offset = offset;
        }
    
        public Path getPath() {
            return path;
        }
    
        public long getOffset() {
            return offset;
        }
    
        public void setPath(Path path) {
            this.path = path;
        }
    
        public void setOffset(long offset) {
            this.offset = offset;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            t.readFields(in);
            path = new Path(t.toString());
            offset = in.readLong();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            t.set(path.toString());
            t.write(out);
            out.writeLong(offset);
        }
    
        @Override
        public int compareTo(PathOffsetWritable o) {
            int x = path.compareTo(o.path);
            if (x != 0) {
                return x;
            } else {
                return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
            }
        }
    
    
    }
    

    【讨论】:

      猜你喜欢
      • 2011-03-24
      • 1970-01-01
      • 2016-08-25
      • 1970-01-01
      • 2016-09-02
      • 1970-01-01
      • 2022-10-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多