【问题标题】:Hadoop - Decompressed zip filesHadoop - 解压缩的 zip 文件
【发布时间】:2015-09-22 10:16:44
【问题描述】:

我有很多 zip 格式的压缩文件(以 GB 为单位),我想编写仅地图作业来解压缩它们。我的映射器类看起来像

import java.util.zip.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.*;

public class DecompressMapper extends Mapper <LongWritable, Text, LongWritable, Text>
{
    private static final int BUFFER_SIZE = 4096;

    public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Context context) throws IOException
    {
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        this.unzip(fileName, new File(fileName).getParent()  + File.separator +  "/test_poc");  
    }

    public void unzip(String zipFilePath, String destDirectory) throws IOException {
        File destDir = new File(destDirectory);
        if (!destDir.exists()) {
            destDir.mkdir();
        }
        ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
        ZipEntry entry = zipIn.getNextEntry();
        // iterates over entries in the zip file
        while (entry != null) {
            String filePath = destDirectory + File.separator + entry.getName();
            if (!entry.isDirectory()) {
                // if the entry is a file, extracts it
                extractFile(zipIn, filePath);
            } else {
                // if the entry is a directory, make the directory
                File dir = new File(filePath);
                dir.mkdir();
            }
            zipIn.closeEntry();
            entry = zipIn.getNextEntry();
        }
        zipIn.close();
    }

    private void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
        byte[] bytesIn = new byte[BUFFER_SIZE];
        int read = 0;
        while ((read = zipIn.read(bytesIn)) != -1) {
            bos.write(bytesIn, 0, read);
        }
        bos.close();
    }
}

还有我的司机课

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DecompressJob extends Configured implements Tool{

    public static void main(String[] args) throws Exception
    {
      int res = ToolRunner.run(new Configuration(), new DecompressJob(),args);
      System.exit(res);
    }

    public int run(String[] args) throws Exception
    {
        Job conf = Job.getInstance(getConf());
        conf.setJobName("MapperOnly");

        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(DecompressMapper.class);
        conf.setNumReduceTasks(0);

        Path inp = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.addInputPath(conf, inp);
        FileOutputFormat.setOutputPath(conf, out);

        return conf.waitForCompletion(true) ? 0: 1;
    }
}

看来我的映射器类不能正常工作。我没有在所需目录中获得解压缩文件。任何帮助表示赞赏。谢谢...

【问题讨论】:

  • 在哪个目录中获取输出
  • 我没有在所需目录中获得输出。上面的代码不会解压缩压缩文件。在运行 map reduce 作业时给出的输出文件夹中获得不需要的输出。
  • 另一种解决方案*.com/a/61138526/5698795

标签: hadoop mapreduce compression


【解决方案1】:

上面的代码问题很少

  1. 我正在使用 MR1 API 和 MR2 API。永远不要那样做。
  2. 使用了 Java IO 函数。 Hadoop 无法识别其文件系统中的 Java IO 函数。
  3. 路径未正确生成。

我们在编写 map reduce 程序时需要小心,因为 hadoop 使用完全不同的文件系统,我们在编写代码时需要考虑这一点,切勿混合使用 MR1 和 MR2 API。

【讨论】:

    【解决方案2】:

    好吧,在hadoop文件系统中解压文件没有具体的方法,但经过长期研究,我想出了如何解压它直接在hadoop文件系统中。前提是你需要复制zip 文件在某个位置,然后运行 ​​mapreduce 作业。很明显,hadoop 不理解 zipfile 输入格式,因此我们需要自定义 Mapper 和 reducer,以便我们可以控制 mapper 发出的内容和 reducer 消耗的内容。请注意,此 Mapreduce 将在单个 Mapper 上运行,因为在自定义 hadoop 提供的 Record Reader 类时,我们禁用了 split 方法,即使其为 false。因此 Mapreduce 将以 Filename as KeyContent of Uncompressed File as 作为 Value。当 reducer 使用它时,我将输出 outputkey 设为 null,因此只有解压缩的内容保留在 reducer 中,并且 reducer 的数量设置为 1,因此所有转储都在单个部分文件中。

    我们都知道 hadoop 不能自己处理 zip 文件,但是 java 可以借助它自己的 ZipFile 类来处理,它可以通过 zipinputstrem 读取 zip 文件内容并通过 zipentry 读取 zip 条目,所以我们写一个自定义的 ZipInputFormat 类,它扩展了 FileInputFormat。

    import java.io.IOException;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class ZipFileInputFormat extends FileInputFormat<Text, BytesWritable> {
    /** See the comments on the setLenient() method */
    private static boolean isLenient = false;
    
    /**
     * ZIP files are not splitable so they cannot be overrided so function
     * return false
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
    
    /**
     * Create the ZipFileRecordReader to parse the file
     */
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        return new ZipFileRecordReader();
    }
    
    /**
     * 
     * @param lenient
     */
    public static void setLenient(boolean lenient) {
        isLenient = lenient;
    }
    
    public static boolean getLenient() {
        return isLenient;
    }
    }
    

    请注意,RecordReader 类返回 ZipFileRecordReadeader,即我们正在讨论的 Hadoop RecordReader 的自定义版本类。现在让我们稍微简化一下 RecordReader 类

    import java.io.IOException;
    import java.io.ByteArrayOutputStream;
    import java.io.EOFException;
    import java.io.IOException;
    import java.util.zip.ZipEntry;
    import java.util.zip.ZipException;
    import java.util.zip.ZipInputStream;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class ZipFileRecordReader extends RecordReader<Text, BytesWritable> {
    /** InputStream used to read the ZIP file from the FileSystem */
    private FSDataInputStream fsin;
    
    /** ZIP file parser/decompresser */
    private ZipInputStream zip;
    
    /** Uncompressed file name */
    private Text currentKey;
    
    /** Uncompressed file contents */
    private BytesWritable currentValue;
    
    /** Used to indicate progress */
    private boolean isFinished = false;
    
    /**
     * Initialise and open the ZIP file from the FileSystem
     */
    @Override
    public void initialize(InputSplit inputSplit,
            TaskAttemptContext taskAttemptContext) throws IOException,
            InterruptedException {
        FileSplit split = (FileSplit) inputSplit;
        Configuration conf = taskAttemptContext.getConfiguration();
        Path path = split.getPath();
        FileSystem fs = path.getFileSystem(conf);
    
        // Open the stream
        fsin = fs.open(path);
        zip = new ZipInputStream(fsin);
    }
    
    /**
     * Each ZipEntry is decompressed and readied for the Mapper. The contents of
     * each file is held *in memory* in a BytesWritable object.
     * 
     * If the ZipFileInputFormat has been set to Lenient (not the default),
     * certain exceptions will be gracefully ignored to prevent a larger job
     * from failing.
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        ZipEntry entry = null;
        try {
            entry = zip.getNextEntry();
        } catch (ZipException e) {
            if (ZipFileInputFormat.getLenient() == false)
                throw e;
        }
    
        // Sanity check
        if (entry == null) {
            isFinished = true;
            return false;
        }
    
        // Filename
        currentKey = new Text(entry.getName());
    
        if (currentKey.toString().endsWith(".zip")) {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] temp1 = new byte[8192];
            while (true) {
                int bytesread1 = 0;
                try {
                    bytesread1 = zip.read(temp1, 0, 8192);
                } catch (EOFException e) {
                    if (ZipFileInputFormat.getLenient() == false)
                        throw e;
                    return false;
                }
                if (bytesread1 > 0)
                    bos.write(temp1, 0, bytesread1);
                else
                    break;
            }
    
            zip.closeEntry();
            currentValue = new BytesWritable(bos.toByteArray());
            return true;
    
        }
    
        // Read the file contents
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        byte[] temp = new byte[8192];
        while (true) {
            int bytesRead = 0;
            try {
                bytesRead = zip.read(temp, 0, 8192);
            } catch (EOFException e) {
                if (ZipFileInputFormat.getLenient() == false)
                    throw e;
                return false;
            }
            if (bytesRead > 0)
                bos.write(temp, 0, bytesRead);
            else
                break;
        }
        zip.closeEntry();
    
        // Uncompressed contents
        currentValue = new BytesWritable(bos.toByteArray());
        return true;
    }
    
    /**
     * Rather than calculating progress, we just keep it simple
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return isFinished ? 1 : 0;
    }
    
    /**
     * Returns the current key (name of the zipped file)
     */
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return currentKey;
    }
    
    /**
     * Returns the current value (contents of the zipped file)
     */
    @Override
    public BytesWritable getCurrentValue() throws IOException,
            InterruptedException {
        return currentValue;
    }
    
    /**
     * Close quietly, ignoring any exceptions
     */
    @Override
    public void close() throws IOException {
        try {
            zip.close();
        } catch (Exception ignore) {
        }
        try {
            fsin.close();
        } catch (Exception ignore) {
        }
    }
    }
    

    为了方便起见,我在源代码中给出了一些 cmets,以便您可以轻松了解如何使用缓冲内存读取和写入文件。现在让我们将上述 Mapper 类写入类

    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.BytesWritable; 
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Mapper; 
    
    public class MyMapper extends Mapper<Text, BytesWritable, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(Text key, BytesWritable value, Context context)
            throws IOException, InterruptedException {
    
        String filename = key.toString();
    
        // We only want to process .txt files
        if (filename.endsWith(".txt") == false)
            return;
    
        // Prepare the content
        String content = new String(value.getBytes(), "UTF-8");
    
        context.write(new Text(content), one);
    }
    }
    

    让我们快速编写相同的 Reducer

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        // context.write(key, new IntWritable(sum));
        context.write(new Text(key), null);
    }
    }
    

    让我们快速配置 Mapper 和 Reducer 的 Job

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.saama.CustomisedMapperReducer.MyMapper;
    import com.saama.CustomisedMapperReducer.MyReducer;
    import com.saama.CustomisedMapperReducer.ZipFileInputFormat;
    import com.saama.CustomisedMapperReducer.ZipFileRecordReader;
    
    public class MyJob {
    
    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
    
        Job job = new Job(conf);
        job.setJarByClass(MyJob.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
    
        job.setInputFormatClass(ZipFileInputFormat.class);
        job.setOutputKeyClass(TextOutputFormat.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    
        ZipFileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setNumReduceTasks(1);
    
        job.waitForCompletion(true);
    
    }
    }
    

    请注意,在作业类中,我们已将 InputFormatClass 配置为我们的 ZipFileInputFormat 类,而 OutputFormatClass 是 TextOutPutFormat 类。

    Mavenize 项目并让依赖项保持原样运行代码。导出 Jar 文件并将其部署到您的 hadoop 集群上。在 CDH5.5 YARN 上测试和部署。 POM 文件的内容为

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.mithun</groupId>
    <artifactId>CustomisedMapperReducer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    
    <name>CustomisedMapperReducer</name>
    <url>http://maven.apache.org</url>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
    
    
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    </project>
    

    【讨论】: