【问题标题】:How to overwrite/reuse the existing output path for Hadoop jobs again and agian如何再次覆盖/重用 Hadoop 作业的现有输出路径
【发布时间】:2011-12-04 12:38:39
【问题描述】:

我想在每天运行 Hadoop 作业时覆盖/重用现有的输出目录。 实际上输出目录将存储每天作业运行结果的汇总输出。 如果我指定相同的输出目录,则会给出错误“输出目录已存在”。

如何绕过此验证?

【问题讨论】:

    标签: hadoop rewrite fileoutputstream


    【解决方案1】:

    在运行作业之前删除目录怎么样?

    您可以通过 shell 执行此操作:

    hadoop fs -rmr /path/to/your/output/
    

    或通过 Java API:

    // configuration should contain reference to your namenode
    FileSystem fs = FileSystem.get(new Configuration());
    // true stands for recursively deleting the folder you gave
    fs.delete(new Path("/path/to/your/output"), true);
    

    【讨论】:

    • 感谢托马斯的回复。我唯一的问题是我不想删除现有的输出目录。每天我将运行我的工作时,我希望其输出与现有输出(旧)合并。我正在考虑的解决方案是将日常工作的输出生成/存储到某个临时目录中,并通过某个脚本将该临时文件夹结构复制粘贴到旧的输出目录中。
    • 然后您必须使用一个文件夹,您可以将所有作业输出复制到该文件夹​​。这也可以用 shellscript 来完成,这个问题没有内置的解决方案。
    【解决方案2】:

    Jungblut 的答案是您的直接解决方案。由于我从不相信自动化流程会删除内容(我个人),所以我会建议一个替代方案:

    我建议您将作业的输出名称设置为动态的,而不是尝试覆盖,包括它运行的时间。

    类似“/path/to/your/output-2011-10-09-23-04/”的东西。这样你就可以保留旧的工作输出,以防你需要重新访问。在我的系统中,每天运行 10 多个工作,我们将输出构造为:/output/job1/2011/10/09/job1out/part-r-xxxxx/output/job1/2011/10/10/job1out/part-r-xxxxx 等。

    【讨论】:

    • +1 即数据保存方法。但请确保有一个垃圾收集守护进程来收集所有过时的目录。否则你的 HDFS 会溢出 ;))。
    • 确实!我们通常会在一周后删除内容。
    • 感谢您的回复。我唯一的问题是 - 我不想删除现有的输出目录。每天我将运行我的工作时,我希望其输出与现有输出(旧)合并。我正在考虑的解决方案是将日常工作的输出生成/存储到某个临时目录中,然后通过某个脚本将该临时文件夹结构复制粘贴到旧的输出目录中。我在 s3 上有文件夹结构,例如“Campaign_Id/Year/Month/day”
    【解决方案3】:

    Hadoop 的TextInputFormat(我猜您正在使用)不允许覆盖现有目录。可能是为了原谅您发现您错误地删除了您(和您的集群)非常努力的东西的痛苦。

    但是,如果您确定希望您的输出文件夹被作业覆盖,我相信最干净的方法是将TextOutputFormat 更改为如下所示:

    public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
    {
          public RecordWriter<K, V> 
          getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
          {
              Configuration conf = job.getConfiguration();
              boolean isCompressed = getCompressOutput(job);
              String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
              CompressionCodec codec = null;
              String extension = "";
              if (isCompressed) 
              {
                  Class<? extends CompressionCodec> codecClass = 
                          getOutputCompressorClass(job, GzipCodec.class);
                  codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
                  extension = codec.getDefaultExtension();
              }
              Path file = getDefaultWorkFile(job, extension);
              FileSystem fs = file.getFileSystem(conf);
              FSDataOutputStream fileOut = fs.create(file, true);
              if (!isCompressed) 
              {
                  return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
              } 
              else 
              {
                  return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
              }
          }
    }
    

    现在您正在使用 overwrite=true 创建FSDataOutputStream (fs.create(file, true))。

    【讨论】:

    • 这个提议的解决方案不仅在概念上存在问题,而且如果没有进一步的更改也将无法工作——检查FileOutputFormat 可确保在作业初始化时输出目录不存在。
    • 除了明显的“在作业前删除文件夹”之外,我还尝试提供一种解决方案。谨言慎行。这总是推荐吗?可能不是,这是一个值得一提的有趣选择吗,IMO,是的。它在过去有效,但不确定现在是否有效。
    • 我认为谨慎回答问题更适合问题的复杂性。对于一个明显是新手的问题,恕我直言,最好避免指向错误的方向。
    • 再回复:过去工作——也许是 mapredmapreduce API?我的评论是关于后者。
    • 查看this answer 以获得更简单的替代方案
    【解决方案4】:

    Hadoop 已经支持您似乎试图通过允许多个输入路径到作业来实现的效果。与其尝试拥有一个向其中添加更多文件的文件目录,不如拥有一个向其中添加新目录的目录。要将聚合结果用作输入,只需将输入 glob 指定为子目录上的通配符(例如,my-aggregate-output/*)。要将新数据“追加”到聚合作为输出,只需指定聚合的新唯一子目录作为输出目录,通常使用时间戳或从输入数据派生的一些序列号(例如my-aggregate-output/20140415154424)。

    【讨论】:

      【解决方案5】:

      如果将输入文件(例如附加条目)从本地文件系统加载到 hadoop 分布式文件系统,如下所示:

      hdfs dfs -put  /mylocalfile /user/cloudera/purchase
      

      然后也可以用-f 覆盖/重用现有的输出目录。无需删除或重新创建文件夹

      hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase
      

      【讨论】:

        【解决方案6】:

        Hadoop 遵循哲学一次写入,多次读取。 因此,当您尝试再次写入目录时,它假定它必须创建一个新目录(一次写入)但它已经存在,所以它抱怨。您可以通过hadoop fs -rmr /path/to/your/output/ 删除它。最好创建一个动态目录(例如,基于时间戳或哈希值)以保存数据。

        【讨论】:

          【解决方案7】:

          您可以按时间为每次执行创建一个输出子目录。例如,假设您希望用户输出目录,然后将其设置如下:

          FileOutputFormat.setOutputPath(job, new Path(args[1]);
          

          通过以下几行更改它:

          String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
          FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
          

          【讨论】:

            【解决方案8】:

            我有一个类似的用例,我使用MultipleOutputs 来解决这个问题。

            例如,如果我希望不同的 MapReduce 作业写入同一目录 /outputDir/。作业 1 写入 /outputDir/job1-part1.txt,作业 2 写入 /outputDir/job1-part2.txt(不删除现有文件)。

            在main中,将输出目录设置为随机目录(可以在新作业运行之前将其删除)

            FileInputFormat.addInputPath(job, new Path("/randomPath"));
            

            在 reducer/mapper 中,使用MultipleOutputs 并设置 writer 写入所需的目录:

            public void setup(Context context) {
                MultipleOutputs mos = new MultipleOutputs(context);
            }
            

            和:

            mos.write(key, value, "/outputDir/fileOfJobX.txt")
            

            但是,我的用例比这要复杂一些。如果只是写入同一个平面目录,你可以写入不同的目录并运行脚本来迁移文件,如:hadoop fs -mv /tmp/* /outputDir

            在我的用例中,每个 MapReduce 作业根据正在写入的消息的值写入不同的子目录。目录结构可以是多层的,如:

            /outputDir/
                messageTypeA/
                    messageSubTypeA1/
                        job1Output/
                            job1-part1.txt
                            job1-part2.txt
                            ...
                        job2Output/
                            job2-part1.txt
                            ...
            
                    messageSubTypeA2/
                    ...
                messageTypeB/
                ...
            

            每个 Mapreduce 作业可以写入数千个子目录。而且写入 tmp 目录并将每个文件移动到正确目录的成本很高。

            【讨论】:

              【解决方案9】:

              我遇到了这个确切的问题,它源于 checkOutputSpecsFileOutputFormat 中引发的异常。就我而言,我希望有很多工作将文件添加到已经存在的目录中,并且我保证这些文件将具有唯一的名称。

              我通过创建一个输出格式类解决了这个问题,该类仅覆盖 checkOutputSpecs 方法并窒息(忽略)FileAlreadyExistsException,它检查目录是否已存在。

              public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
                  @Override
                  public void checkOutputSpecs(JobContext job) throws IOException {
                      try {
                          super.checkOutputSpecs(job);
                      }catch (FileAlreadyExistsException ignored){
                          // Suffocate the exception
                      }
                  }
              }
              

              在作业配置中,我使用了LazyOutputFormatMultipleOutputs

              LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
              

              【讨论】:

                【解决方案10】:

                您需要在主类中添加设置:

                //Configuring the output path from the filesystem into the job
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                //auto_delete output dir
                OutputPath.getFileSystem(conf).delete(OutputPath);
                

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 2015-12-03
                  • 2014-08-17
                  • 2012-07-26
                  • 1970-01-01
                  • 1970-01-01
                  • 2011-05-07
                  • 1970-01-01
                  相关资源
                  最近更新 更多