【发布时间】:2011-12-04 12:38:39
【问题描述】:
我想在每天运行 Hadoop 作业时覆盖/重用现有的输出目录。 实际上输出目录将存储每天作业运行结果的汇总输出。 如果我指定相同的输出目录,则会给出错误“输出目录已存在”。
如何绕过此验证?
【问题讨论】:
标签: hadoop rewrite fileoutputstream
我想在每天运行 Hadoop 作业时覆盖/重用现有的输出目录。 实际上输出目录将存储每天作业运行结果的汇总输出。 如果我指定相同的输出目录,则会给出错误“输出目录已存在”。
如何绕过此验证?
【问题讨论】:
标签: hadoop rewrite fileoutputstream
在运行作业之前删除目录怎么样?
您可以通过 shell 执行此操作:
hadoop fs -rmr /path/to/your/output/
或通过 Java API:
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
【讨论】:
Jungblut 的答案是您的直接解决方案。由于我从不相信自动化流程会删除内容(我个人),所以我会建议一个替代方案:
我建议您将作业的输出名称设置为动态的,而不是尝试覆盖,包括它运行的时间。
类似“/path/to/your/output-2011-10-09-23-04/”的东西。这样你就可以保留旧的工作输出,以防你需要重新访问。在我的系统中,每天运行 10 多个工作,我们将输出构造为:/output/job1/2011/10/09/job1out/part-r-xxxxx、/output/job1/2011/10/10/job1out/part-r-xxxxx 等。
【讨论】:
Hadoop 的TextInputFormat(我猜您正在使用)不允许覆盖现有目录。可能是为了原谅您发现您错误地删除了您(和您的集群)非常努力的东西的痛苦。
但是,如果您确定希望您的输出文件夹被作业覆盖,我相信最干净的方法是将TextOutputFormat 更改为如下所示:
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed)
{
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, true);
if (!isCompressed)
{
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
else
{
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}
现在您正在使用 overwrite=true 创建FSDataOutputStream (fs.create(file, true))。
【讨论】:
FileOutputFormat 可确保在作业初始化时输出目录不存在。
mapred 与 mapreduce API?我的评论是关于后者。
Hadoop 已经支持您似乎试图通过允许多个输入路径到作业来实现的效果。与其尝试拥有一个向其中添加更多文件的文件目录,不如拥有一个向其中添加新目录的目录。要将聚合结果用作输入,只需将输入 glob 指定为子目录上的通配符(例如,my-aggregate-output/*)。要将新数据“追加”到聚合作为输出,只需指定聚合的新唯一子目录作为输出目录,通常使用时间戳或从输入数据派生的一些序列号(例如my-aggregate-output/20140415154424)。
【讨论】:
如果将输入文件(例如附加条目)从本地文件系统加载到 hadoop 分布式文件系统,如下所示:
hdfs dfs -put /mylocalfile /user/cloudera/purchase
然后也可以用-f 覆盖/重用现有的输出目录。无需删除或重新创建文件夹
hdfs dfs -put -f /updated_mylocalfile /user/cloudera/purchase
【讨论】:
Hadoop 遵循哲学一次写入,多次读取。 因此,当您尝试再次写入目录时,它假定它必须创建一个新目录(一次写入)但它已经存在,所以它抱怨。您可以通过hadoop fs -rmr /path/to/your/output/ 删除它。最好创建一个动态目录(例如,基于时间戳或哈希值)以保存数据。
【讨论】:
您可以按时间为每次执行创建一个输出子目录。例如,假设您希望用户输出目录,然后将其设置如下:
FileOutputFormat.setOutputPath(job, new Path(args[1]);
通过以下几行更改它:
String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
【讨论】:
我有一个类似的用例,我使用MultipleOutputs 来解决这个问题。
例如,如果我希望不同的 MapReduce 作业写入同一目录 /outputDir/。作业 1 写入 /outputDir/job1-part1.txt,作业 2 写入 /outputDir/job1-part2.txt(不删除现有文件)。
在main中,将输出目录设置为随机目录(可以在新作业运行之前将其删除)
FileInputFormat.addInputPath(job, new Path("/randomPath"));
在 reducer/mapper 中,使用MultipleOutputs 并设置 writer 写入所需的目录:
public void setup(Context context) {
MultipleOutputs mos = new MultipleOutputs(context);
}
和:
mos.write(key, value, "/outputDir/fileOfJobX.txt")
但是,我的用例比这要复杂一些。如果只是写入同一个平面目录,你可以写入不同的目录并运行脚本来迁移文件,如:hadoop fs -mv /tmp/* /outputDir
在我的用例中,每个 MapReduce 作业根据正在写入的消息的值写入不同的子目录。目录结构可以是多层的,如:
/outputDir/
messageTypeA/
messageSubTypeA1/
job1Output/
job1-part1.txt
job1-part2.txt
...
job2Output/
job2-part1.txt
...
messageSubTypeA2/
...
messageTypeB/
...
每个 Mapreduce 作业可以写入数千个子目录。而且写入 tmp 目录并将每个文件移动到正确目录的成本很高。
【讨论】:
我遇到了这个确切的问题,它源于 checkOutputSpecs 类 FileOutputFormat 中引发的异常。就我而言,我希望有很多工作将文件添加到已经存在的目录中,并且我保证这些文件将具有唯一的名称。
我通过创建一个输出格式类解决了这个问题,该类仅覆盖 checkOutputSpecs 方法并窒息(忽略)FileAlreadyExistsException,它检查目录是否已存在。
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
try {
super.checkOutputSpecs(job);
}catch (FileAlreadyExistsException ignored){
// Suffocate the exception
}
}
}
在作业配置中,我使用了LazyOutputFormat 和MultipleOutputs。
LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
【讨论】:
您需要在主类中添加设置:
//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);
【讨论】: