【问题标题】:hadoop method to send output to multiple directorieshadoop方法将输出发送到多个目录
【发布时间】:2013-11-18 05:10:31
【问题描述】:

我的MapReduce 作业按日期处理数据,需要将输出写入某个文件夹结构。目前的期望是生成以下结构的输出:

2013
    01
    02
    ..

2012
    01
    02
    ..

等等

在任何时候,我最多只能获得 12 个月的数据,因此,我使用 MultipleOutputs 类在驱动程序中使用以下函数创建 12 个输出:

public void createOutputs(){
    Calendar c = Calendar.getInstance();
    String monthStr, pathStr;

    // Create multiple outputs for last 12 months
    // TODO make 12 configurable
    for(int i = 0; i < 12; ++i ){
        //Get month and add 1 as month is 0 based index
        int month = c.get(Calendar.MONTH)+1; 
        //Add leading 0
        monthStr = month > 10 ? "" + month : "0" + month ;  
        // Generate path string in the format 2013/03/etl
        pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
        // Add the named output
        MultipleOutputs.addNamedOutput(config, pathStr );  
        // Move to previous month
        c.add(Calendar.MONTH, -1); 
    }
}

在reducer 中,我添加了一个清理函数来将生成的输出移动到适当的目录。

protected void cleanup(Context context) throws IOException, InterruptedException {
        // Custom function to recursively process data
        moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}

问题:在输出从 _temporary 目录移动到输出目录之前,reducer 的清理功能正在执行。因此,上述函数在执行时看不到任何输出,因为所有数据仍在 _temporary 目录中。

实现所需功能的最佳方式是什么? 欣赏任何见解。

考虑以下几点:

  • 有没有办法使用自定义输出提交器?
  • 链接另一个工作更好还是这样做有点矫枉过正?
  • 有没有我不知道的更简单的替代方案..

这是来自cleanup函数的文件结构示例日志:

MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000

【问题讨论】:

    标签: java hadoop mapreduce hdfs


    【解决方案1】:

    你不应该需要第二份工作。我目前正在使用 MultipleOutputs 在我的一个程序中创建大量输出目录。尽管有超过 30 个目录,但我只能使用几个 MultipleOutputs 对象。这是因为你可以在写的时候设置输出目录,所以只有在需要的时候才能确定。如果你想以不同的格式输出,你实际上只需要一个以上的 namedOutput(例如,一个键:Text.class,值:Text.class,一个键:Text.class 和值:IntWritable.class)

    设置:

    MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class);
    

    减速机的设置:

    mout = new MultipleOutputs<Text, Text>(context);
    

    在 reducer 中调用 mout:

    String key; //set to whatever output key will be
    String value; //set to whatever output value will be
    String outputFileName; //set to absolute path to file where this should write
    
    mout.write("Output",new Text(key),new Text(value),outputFileName);
    

    您可以在编码时让一段代码确定目录。例如,假设您要按月和年指定目录:

    int year;//extract year from data
    int month;//extract month from data
    String baseFileName; //parent directory to all outputs from this job
    String outputFileName = baseFileName + "/" + year + "/" + month;
    
    mout.write("Output",new Text(key),new Text(value),outputFileName);
    

    希望这会有所帮助。

    编辑:以上示例的输出文件结构:

    Base
        2013
            01
            02
            03
            ...
        2012
            01
            ...
        ...
    

    【讨论】:

      【解决方案2】:

      很可能你错过了在清理中关闭 mos。

      如果您在 mapper 或 reducer 中有如下设置:

      public void setup(Context context) {mos = new MultipleOutputs(context);}
      

      您应该在清理开始时关闭 mos,如下所示..

      public void cleanup(Context context ) throws IOException, InterruptedException {mos.close();}
      

      【讨论】:

        猜你喜欢
        • 2018-07-17
        • 2014-10-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-01-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多