好吧,在hadoop文件系统中解压文件没有具体的方法,但经过长期研究,我想出了如何解压它直接在hadoop文件系统中。前提是你需要复制zip 文件在某个位置,然后运行 mapreduce 作业。很明显,hadoop 不理解 zipfile 输入格式,因此我们需要自定义 Mapper 和 reducer,以便我们可以控制 mapper 发出的内容和 reducer 消耗的内容。请注意,此 Mapreduce 将在单个 Mapper 上运行,因为在自定义 hadoop 提供的 Record Reader 类时,我们禁用了 split 方法,即使其为 false。因此 Mapreduce 将以 Filename as Key 和 Content of Uncompressed File as 作为 Value。当 reducer 使用它时,我将输出 outputkey 设为 null,因此只有解压缩的内容保留在 reducer 中,并且 reducer 的数量设置为 1,因此所有转储都在单个部分文件中。
我们都知道 hadoop 不能自己处理 zip 文件,但是 java 可以借助它自己的 ZipFile 类来处理,它可以通过 zipinputstrem 读取 zip 文件内容并通过 zipentry 读取 zip 条目,所以我们写一个自定义的 ZipInputFormat 类,它扩展了 FileInputFormat。
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ZipFileInputFormat extends FileInputFormat<Text, BytesWritable> {
/** See the comments on the setLenient() method */
private static boolean isLenient = false;
/**
* ZIP files are not splitable so they cannot be overrided so function
* return false
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
/**
* Create the ZipFileRecordReader to parse the file
*/
@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
return new ZipFileRecordReader();
}
/**
*
* @param lenient
*/
public static void setLenient(boolean lenient) {
isLenient = lenient;
}
public static boolean getLenient() {
return isLenient;
}
}
请注意,RecordReader 类返回 ZipFileRecordReadeader,即我们正在讨论的 Hadoop RecordReader 的自定义版本类。现在让我们稍微简化一下 RecordReader 类
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class ZipFileRecordReader extends RecordReader<Text, BytesWritable> {
/** InputStream used to read the ZIP file from the FileSystem */
private FSDataInputStream fsin;
/** ZIP file parser/decompresser */
private ZipInputStream zip;
/** Uncompressed file name */
private Text currentKey;
/** Uncompressed file contents */
private BytesWritable currentValue;
/** Used to indicate progress */
private boolean isFinished = false;
/**
* Initialise and open the ZIP file from the FileSystem
*/
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
// Open the stream
fsin = fs.open(path);
zip = new ZipInputStream(fsin);
}
/**
* Each ZipEntry is decompressed and readied for the Mapper. The contents of
* each file is held *in memory* in a BytesWritable object.
*
* If the ZipFileInputFormat has been set to Lenient (not the default),
* certain exceptions will be gracefully ignored to prevent a larger job
* from failing.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
ZipEntry entry = null;
try {
entry = zip.getNextEntry();
} catch (ZipException e) {
if (ZipFileInputFormat.getLenient() == false)
throw e;
}
// Sanity check
if (entry == null) {
isFinished = true;
return false;
}
// Filename
currentKey = new Text(entry.getName());
if (currentKey.toString().endsWith(".zip")) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] temp1 = new byte[8192];
while (true) {
int bytesread1 = 0;
try {
bytesread1 = zip.read(temp1, 0, 8192);
} catch (EOFException e) {
if (ZipFileInputFormat.getLenient() == false)
throw e;
return false;
}
if (bytesread1 > 0)
bos.write(temp1, 0, bytesread1);
else
break;
}
zip.closeEntry();
currentValue = new BytesWritable(bos.toByteArray());
return true;
}
// Read the file contents
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] temp = new byte[8192];
while (true) {
int bytesRead = 0;
try {
bytesRead = zip.read(temp, 0, 8192);
} catch (EOFException e) {
if (ZipFileInputFormat.getLenient() == false)
throw e;
return false;
}
if (bytesRead > 0)
bos.write(temp, 0, bytesRead);
else
break;
}
zip.closeEntry();
// Uncompressed contents
currentValue = new BytesWritable(bos.toByteArray());
return true;
}
/**
* Rather than calculating progress, we just keep it simple
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return isFinished ? 1 : 0;
}
/**
* Returns the current key (name of the zipped file)
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
/**
* Returns the current value (contents of the zipped file)
*/
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return currentValue;
}
/**
* Close quietly, ignoring any exceptions
*/
@Override
public void close() throws IOException {
try {
zip.close();
} catch (Exception ignore) {
}
try {
fsin.close();
} catch (Exception ignore) {
}
}
}
为了方便起见,我在源代码中给出了一些 cmets,以便您可以轻松了解如何使用缓冲内存读取和写入文件。现在让我们将上述 Mapper 类写入类
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<Text, BytesWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Text key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String filename = key.toString();
// We only want to process .txt files
if (filename.endsWith(".txt") == false)
return;
// Prepare the content
String content = new String(value.getBytes(), "UTF-8");
context.write(new Text(content), one);
}
}
让我们快速编写相同的 Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// context.write(key, new IntWritable(sum));
context.write(new Text(key), null);
}
}
让我们快速配置 Mapper 和 Reducer 的 Job
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.saama.CustomisedMapperReducer.MyMapper;
import com.saama.CustomisedMapperReducer.MyReducer;
import com.saama.CustomisedMapperReducer.ZipFileInputFormat;
import com.saama.CustomisedMapperReducer.ZipFileRecordReader;
public class MyJob {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MyJob.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(ZipFileInputFormat.class);
job.setOutputKeyClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
ZipFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
}
请注意,在作业类中,我们已将 InputFormatClass 配置为我们的 ZipFileInputFormat 类,而 OutputFormatClass 是 TextOutPutFormat 类。
Mavenize 项目并让依赖项保持原样运行代码。导出 Jar 文件并将其部署到您的 hadoop 集群上。在 CDH5.5 YARN 上测试和部署。 POM 文件的内容为
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mithun</groupId>
<artifactId>CustomisedMapperReducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>CustomisedMapperReducer</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>