【发布时间】:2013-11-18 05:10:31
【问题描述】:
我的MapReduce 作业按日期处理数据,需要将输出写入某个文件夹结构。目前的期望是生成以下结构的输出:
2013
01
02
..
2012
01
02
..
等等
在任何时候,我最多只能获得 12 个月的数据,因此,我使用 MultipleOutputs 类在驱动程序中使用以下函数创建 12 个输出:
public void createOutputs(){
Calendar c = Calendar.getInstance();
String monthStr, pathStr;
// Create multiple outputs for last 12 months
// TODO make 12 configurable
for(int i = 0; i < 12; ++i ){
//Get month and add 1 as month is 0 based index
int month = c.get(Calendar.MONTH)+1;
//Add leading 0
monthStr = month > 10 ? "" + month : "0" + month ;
// Generate path string in the format 2013/03/etl
pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
// Add the named output
MultipleOutputs.addNamedOutput(config, pathStr );
// Move to previous month
c.add(Calendar.MONTH, -1);
}
}
在reducer 中,我添加了一个清理函数来将生成的输出移动到适当的目录。
protected void cleanup(Context context) throws IOException, InterruptedException {
// Custom function to recursively process data
moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}
问题:在输出从 _temporary 目录移动到输出目录之前,reducer 的清理功能正在执行。因此,上述函数在执行时看不到任何输出,因为所有数据仍在 _temporary 目录中。
实现所需功能的最佳方式是什么? 欣赏任何见解。
考虑以下几点:
- 有没有办法使用自定义输出提交器?
- 链接另一个工作更好还是这样做有点矫枉过正?
- 有没有我不知道的更简单的替代方案..
这是来自cleanup函数的文件结构示例日志:
MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000
【问题讨论】:
标签: java hadoop mapreduce hdfs