【问题标题】:Many input files to SINGLE map. Hadoop. how?许多输入文件到 SINGLE 映射。 Hadoop。如何?
【发布时间】:2015-04-16 08:16:42
【问题描述】:

如何对多个输入文件只使用一张地图?因为 Hadoop 为一个文件创建了一个映射器。我只需要一个映射器来处理所有文件。

我尝试使用CombineFileInputFormat。它有一个映射器,但映射输入只包含一个文件。我需要输入映射值来包含来自所有文件(文本格式)的数据,如下所示:

输入地图值:

来自file1.txt的数据
来自 file2.txt 的数据
来自 file3.txt 的数据

public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text> {

public WholeFileInputFormat() {
    super();
    setMaxSplitSize(67108864);
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
    return false;
}

@Override
public RecordReader<NullWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {

    if (!(split instanceof CombineFileSplit)) {
        throw new IllegalArgumentException("split must be a CombineFileSplit");
    }
    RecordReader<NullWritable, Text> r = new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);
    return r;
    //return null;
}

}


public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {

private final Text mFileText;

public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
                             Integer pathToProcess) throws IOException {

    mProcessed = false;
    mFileToRead = fileSplit.getPath(pathToProcess);
    mFileLength = fileSplit.getLength(pathToProcess);
    mConf = context.getConfiguration();

    assert 0 == fileSplit.getOffset(pathToProcess);
    FileSystem fs = FileSystem.get(mConf);
    assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;

    //    mFileName = new Text();
    mFileText = new Text();
}

@Override
public void close() throws IOException {
    mFileText.clear();
}


@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
    return NullWritable.get();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    return mFileText;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return (mProcessed) ? (float) 1.0 : (float) 0.0;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    // no-op.
}


@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!mProcessed) {
        if (mFileLength > (long) Integer.MAX_VALUE) {
            throw new IOException("File is longer than Integer.MAX_VALUE.");
        }
        byte[] contents = new byte[(int) mFileLength];

        FileSystem fs = mFileToRead.getFileSystem(mConf);
        FSDataInputStream in = null;
        try {
            // Set the contents of this file.
            in = fs.open(mFileToRead);
            IOUtils.readFully(in, contents, 0, contents.length);
            mFileText.set(contents, 0, contents.length);

        } finally {
            IOUtils.closeStream(in);
        }
        mProcessed = true;
        return true;
    }
    return false;
}
}

你能帮帮我吗?

【问题讨论】:

  • 无法理解您想要实现的目标,因为您只需为所有输入编写一张地图。
  • Jijo ,我尝试将 1000 个文件中的数据输入组合到一个映射输入值(对于一个映射器)。例如,据我所知,对于 1000 个文件,它将是 1000 个映射器。它很长。我需要将所有文件中的文本数据合并到一条记录中,然后用于映射输入值。

标签: hadoop mapreduce


【解决方案1】:

映射器的数量不是由文件的数量决定的,而是由组成这些文件的块的数量决定的;因此,Hadoop 将每个文件拆分为块,并为每个文件创建一个映射器。请查看one 之类的链接,以了解有关 Hadoop 如何选择映射器和缩减器数量的更多信息。

如果您确实想要一个映射器,必须说设置此参数mapred.map.tasks 将不起作用,因为这是对 Hadoop 的提示,而不是强制参数。您可以尝试将块大小增加到一个非常高的数字...

无论如何,在 Hadoop 中使用单个映射器是没有意义的……您将错过数据的分布式处理,这是这种系统的优势之一。

【讨论】:

    猜你喜欢
    • 2014-01-06
    • 1970-01-01
    • 2011-04-12
    • 1970-01-01
    • 1970-01-01
    • 2013-10-01
    • 2015-07-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多