【发布时间】:2015-04-16 08:16:42
【问题描述】:
如何对多个输入文件只使用一张地图?因为 Hadoop 为一个文件创建了一个映射器。我只需要一个映射器来处理所有文件。
我尝试使用CombineFileInputFormat。它有一个映射器,但映射输入只包含一个文件。我需要输入映射值来包含来自所有文件(文本格式)的数据,如下所示:
输入地图值:
来自file1.txt的数据
来自 file2.txt 的数据
来自 file3.txt 的数据
public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text> {
public WholeFileInputFormat() {
super();
setMaxSplitSize(67108864);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
if (!(split instanceof CombineFileSplit)) {
throw new IllegalArgumentException("split must be a CombineFileSplit");
}
RecordReader<NullWritable, Text> r = new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);
return r;
//return null;
}
}
public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
private final Text mFileText;
public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
Integer pathToProcess) throws IOException {
mProcessed = false;
mFileToRead = fileSplit.getPath(pathToProcess);
mFileLength = fileSplit.getLength(pathToProcess);
mConf = context.getConfiguration();
assert 0 == fileSplit.getOffset(pathToProcess);
FileSystem fs = FileSystem.get(mConf);
assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
// mFileName = new Text();
mFileText = new Text();
}
@Override
public void close() throws IOException {
mFileText.clear();
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return mFileText;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (mProcessed) ? (float) 1.0 : (float) 0.0;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// no-op.
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!mProcessed) {
if (mFileLength > (long) Integer.MAX_VALUE) {
throw new IOException("File is longer than Integer.MAX_VALUE.");
}
byte[] contents = new byte[(int) mFileLength];
FileSystem fs = mFileToRead.getFileSystem(mConf);
FSDataInputStream in = null;
try {
// Set the contents of this file.
in = fs.open(mFileToRead);
IOUtils.readFully(in, contents, 0, contents.length);
mFileText.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
mProcessed = true;
return true;
}
return false;
}
}
你能帮帮我吗?
【问题讨论】:
-
无法理解您想要实现的目标,因为您只需为所有输入编写一张地图。
-
Jijo ,我尝试将 1000 个文件中的数据输入组合到一个映射输入值(对于一个映射器)。例如,据我所知,对于 1000 个文件,它将是 1000 个映射器。它很长。我需要将所有文件中的文本数据合并到一条记录中,然后用于映射输入值。