【问题标题】:Chaining multiple MapReduce jobs in Hadoop在 Hadoop 中链接多个 MapReduce 作业
【发布时间】:2011-01-30 18:31:31
【问题描述】:

在您应用 MapReduce 的许多实际情况中,最终算法最终是几个 MapReduce 步骤。

即Map1 、Reduce1 、Map2 、Reduce2 等。

因此,您可以将最后一个 reduce 的输出用作下一个 map 的输入。

一旦管道成功完成,中间数据是您(通常)不想保留的数据。此外,由于这些中间数据通常是某种数据结构(如“地图”或“集合”),因此您不希望在写入和读取这些键值对时投入太多精力。

在 Hadoop 中推荐的方法是什么?

是否有一个(简单的)示例说明如何以正确的方式处理这些中间数据,包括之后的清理?

【问题讨论】:

  • 使用哪个 mapreduce 框架?
  • 我编辑了问题以澄清我在谈论 Hadoop。
  • 我会为此推荐猪群宝石:github.com/Ganglion/swineherd best, Tobias

标签: hadoop mapreduce


【解决方案1】:

我认为雅虎开发者网络上的这个教程会帮助你:Chaining Jobs

您使用JobClient.runJob()。第一个作业的数据输出路径成为第二个作业的输入路径。这些需要作为参数传递给您的作业,并使用适当的代码来解析它们并为作业设置参数。

我认为上述方法可能是现在较旧的 mapred API 的方式,但它应该仍然有效。新的 mapreduce API 中会有类似的方法,但我不确定它是什么。

至于在作业完成后删除中间数据,您可以在代码中执行此操作。我以前做的方式是使用类似的东西:

FileSystem.delete(Path f, boolean recursive);

路径是数据在 HDFS 上的位置。您需要确保仅在其他工作不需要时才删除此数据。

【讨论】:

  • 感谢您提供指向 Yahoo 教程的链接。如果两者在同一运行中,Chaining Jobs 确实是您想要的。我一直在寻找的是,如果您希望能够单独运行它们,那么简单的方法是什么。在提到的教程中,我发现了 SequenceFileOutputFormat“写入适合读取后续 MapReduce 作业的二进制文件”和匹配的 SequenceFileInputFormat,这使得这一切变得非常容易。谢谢。
【解决方案2】:

有很多方法可以做到。

(1) 级联作业

为第一个作业创建 JobConf 对象“job1”,并将所有参数设置为“input”作为输入目录,“temp”作为输出目录。执行此作业:

JobClient.run(job1).

在它的正下方,为第二个作业创建 JobConf 对象“job2”,并将所有参数设置为“temp”作为输入目录,“output”作为输出目录。执行此作业:

JobClient.run(job2).

(2) 创建两个 JobConf 对象并设置其中的所有参数,就像 (1) 一样,只是你不使用JobClient.run。

然后创建两个以jobconfs为参数的Job对象:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

使用 jobControl 对象,您指定作业依赖项,然后运行作业:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) 如果你需要一个有点像 Map+ | 的结构减少 | Map*,您可以使用 Hadoop 0.19 及更高版本附带的 ChainMapper 和 ChainReducer 类。

【讨论】:

    【解决方案3】:

    实际上有很多方法可以做到这一点。我将重点关注两个。

    一个是通过 Riffle (http://github.com/cwensel/riffle) 一个注释库,用于识别依赖的事物并以依赖(拓扑)顺序“执行”它们。

    或者您可以在级联 (http://www.cascading.org/) 中使用级联(和 MapReduceFlow)。未来的版本将支持 Riffle 注释,但现在可以很好地处理原始 MR JobConf 作业。

    对此的一个变体是根本不手动管理 MR 作业,而是使用 Cascading API 开发您的应用程序。然后 JobConf 和作业链在内部通过 Cascading planner 和 Flow 类进行处理。

    通过这种方式,您可以将时间花在解决问题上,而不是花在管理 Hadoop 作业等的机制上。您甚至可以将不同的语言(如 clojure 或 jruby)分层,以进一步简化您的开发和应用程序。 http://www.cascading.org/modules.html

    【讨论】:

      【解决方案4】:

      您可以按照代码中给出的方式运行 MR 链。

      请注意:仅提供了驱动程序代码

      public class WordCountSorting {
      // here the word keys shall be sorted
            //let us write the wordcount logic first
      
            public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
                  //THE DRIVER CODE FOR MR CHAIN
                  Configuration conf1=new Configuration();
                  Job j1=Job.getInstance(conf1);
                  j1.setJarByClass(WordCountSorting.class);
                  j1.setMapperClass(MyMapper.class);
                  j1.setReducerClass(MyReducer.class);
      
                  j1.setMapOutputKeyClass(Text.class);
                  j1.setMapOutputValueClass(IntWritable.class);
                  j1.setOutputKeyClass(LongWritable.class);
                  j1.setOutputValueClass(Text.class);
                  Path outputPath=new Path("FirstMapper");
                  FileInputFormat.addInputPath(j1,new Path(args[0]));
                        FileOutputFormat.setOutputPath(j1,outputPath);
                        outputPath.getFileSystem(conf1).delete(outputPath);
                  j1.waitForCompletion(true);
                        Configuration conf2=new Configuration();
                        Job j2=Job.getInstance(conf2);
                        j2.setJarByClass(WordCountSorting.class);
                        j2.setMapperClass(MyMapper2.class);
                        j2.setNumReduceTasks(0);
                        j2.setOutputKeyClass(Text.class);
                        j2.setOutputValueClass(IntWritable.class);
                        Path outputPath1=new Path(args[1]);
                        FileInputFormat.addInputPath(j2, outputPath);
                        FileOutputFormat.setOutputPath(j2, outputPath1);
                        outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                        System.exit(j2.waitForCompletion(true)?0:1);
            }
      
      }
      

      顺序是

      (JOB1)MAP->REDUCE-> (JOB2)MAP
      这样做是为了对键进行排序,但还有更多方法,例如使用树形图
      然而,我想把你的注意力集中在乔布斯被锁住的方式上!!
      谢谢

      【讨论】:

        【解决方案5】:

        我已经一个接一个地使用 JobConf 对象完成了作业链接。我以 WordCount 为例来链接作业。一项工作是计算一个单词 a 在给定输出中重复了多少次。第二个作业将第一个作业输出作为输入,并计算给定输入中的总字数。下面是需要放在Driver类中的代码。

            //First Job - Counts, how many times a word encountered in a given file 
            JobConf job1 = new JobConf(WordCount.class);
            job1.setJobName("WordCount");
        
            job1.setOutputKeyClass(Text.class);
            job1.setOutputValueClass(IntWritable.class);
        
            job1.setMapperClass(WordCountMapper.class);
            job1.setCombinerClass(WordCountReducer.class);
            job1.setReducerClass(WordCountReducer.class);
        
            job1.setInputFormat(TextInputFormat.class);
            job1.setOutputFormat(TextOutputFormat.class);
        
            //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
            FileInputFormat.setInputPaths(job1, new Path("input_data"));
        
            //"first_job_output" contains data that how many times a word occurred in the given file
            //This will be the input to the second job. For second job, input data name should be
            //"first_job_output". 
            FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));
        
            JobClient.runJob(job1);
        
        
            //Second Job - Counts total number of words in a given file
        
            JobConf job2 = new JobConf(TotalWords.class);
            job2.setJobName("TotalWords");
        
            job2.setOutputKeyClass(Text.class);
            job2.setOutputValueClass(IntWritable.class);
        
            job2.setMapperClass(TotalWordsMapper.class);
            job2.setCombinerClass(TotalWordsReducer.class);
            job2.setReducerClass(TotalWordsReducer.class);
        
            job2.setInputFormat(TextInputFormat.class);
            job2.setOutputFormat(TextOutputFormat.class);
        
            //Path name for this job should match first job's output path name
            FileInputFormat.setInputPaths(job2, new Path("first_job_output"));
        
            //This will contain the final output. If you want to send this jobs output
            //as input to third job, then third jobs input path name should be "second_job_output"
            //In this way, jobs can be chained, sending output one to other as input and get the
            //final output
            FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));
        
            JobClient.runJob(job2);
        

        运行这些作业的命令是:

        bin/hadoop jar TotalWords.

        我们需要为命令提供最终的作业名称。在上述情况下,它是 TotalWords。

        【讨论】:

          【解决方案6】:

          您可以使用 oozie 来处理您的 MapReduce 作业。 http://issues.apache.org/jira/browse/HADOOP-5303

          【讨论】:

            【解决方案7】:

            Apache Mahout 项目中有将多个 MapReduce 作业链接在一起的示例。示例之一可以在以下位置找到:

            RecommenderJob.java

            http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

            【讨论】:

              【解决方案8】:

              我们可以利用Job的waitForCompletion(true)方法来定义job之间的依赖关系。

              在我的场景中,我有 3 个相互依赖的工作。在驱动程序类中,我使用了以下代码,它按预期工作。

              public static void main(String[] args) throws Exception {
                      // TODO Auto-generated method stub
              
                      CCJobExecution ccJobExecution = new CCJobExecution();
              
                      Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
                      Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
                      Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);
              
                      System.out.println("****************Started Executing distanceTimeFraudJob ================");
                      distanceTimeFraudJob.submit();
                      if(distanceTimeFraudJob.waitForCompletion(true))
                      {
                          System.out.println("=================Completed DistanceTimeFraudJob================= ");
                          System.out.println("=================Started Executing spendingFraudJob ================");
                          spendingFraudJob.submit();
                          if(spendingFraudJob.waitForCompletion(true))
                          {
                              System.out.println("=================Completed spendingFraudJob================= ");
                              System.out.println("=================Started locationFraudJob================= ");
                              locationFraudJob.submit();
                              if(locationFraudJob.waitForCompletion(true))
                              {
                                  System.out.println("=================Completed locationFraudJob=================");
                              }
                          }
                      }
                  }
              

              【讨论】:

              • 您的答案是关于如何在执行方面加入这些工作。最初的问题是关于最好的数据结构。因此,您的答案与这个特定问题无关。
              【解决方案9】:

              新的类 org.apache.hadoop.mapreduce.lib.chain.ChainMapper 帮助这个场景

              【讨论】:

              • 答案很好 - 但您应该添加更多关于它的作用的详细信息,或者至少添加一个指向 API 参考的链接,以便人们可以投票
              • ChainMapper 和 ChainReducer 用于在 Reduce 之前有 1 个或多个映射器,在 Reduce 之后有 0 个或多个映射器,规范。 (映射器+)减少(映射器*)。如果我明显错了,请纠正我,但我不认为这种方法可以按照 OP 的要求完成串行链接作业。
              【解决方案10】:

              虽然有复杂的基于服务器的 Hadoop 工作流引擎,例如 oozie,但我有一个简单的 Java 库,可以将多个 Hadoop 作业作为工作流执行。定义作业间依赖关系的作业配置和工作流在 JSON 文件中进行配置。一切都是外部可配置的,不需要对现有的 map reduce 实现进行任何更改即可成为工作流的一部分。

              详细信息可以在这里找到。源码和jar可以在github上找到。

              http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

              普拉纳布

              【讨论】:

                【解决方案11】:

                我认为 oozie 可以帮助后续工作直接从前一个工作接收输入。这避免了使用作业控制执行的 I/O 操作。

                【讨论】:

                  【解决方案12】:

                  如果您想以编程方式链接您的作业,您将需要使用 JobControl。用法很简单:

                  JobControl jobControl = new JobControl(name);
                  

                  然后添加 ControlledJob 实例。 ControlledJob 定义了一个作业及其依赖项,从而自动插入输入和输出以适应作业“链”。

                      jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));
                  
                      jobControl.run();
                  

                  启动链。你会想把它放在一个单独的线程中。这允许在链运行时检查链的状态:

                      while (!jobControl.allFinished()) {
                          System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
                          System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
                          System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
                          List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
                          System.out.println("Jobs in success state: " + successfulJobList.size());
                          List<ControlledJob> failedJobList = jobControl.getFailedJobList();
                          System.out.println("Jobs in failed state: " + failedJobList.size());
                      }
                  

                  【讨论】:

                    【解决方案13】:

                    正如您在要求中提到希望 MRJob1 的 o/p 成为 MRJob2 的 i/p 等等,您可以考虑为此用例使用 oozie 工作流。此外,您可能会考虑将中间数据写入 HDFS,因为它将被下一个 MRJob 使用。工作完成后,您可以清理中间数据。

                    <start to="mr-action1"/>
                    <action name="mr-action1">
                       <!-- action for MRJob1-->
                       <!-- set output path = /tmp/intermediate/mr1-->
                        <ok to="end"/>
                        <error to="end"/>
                    </action>
                    
                    <action name="mr-action2">
                       <!-- action for MRJob2-->
                       <!-- set input path = /tmp/intermediate/mr1-->
                        <ok to="end"/>
                        <error to="end"/>
                    </action>
                    
                    <action name="success">
                            <!-- action for success-->
                        <ok to="end"/>
                        <error to="end"/>
                    </action>
                    
                    <action name="fail">
                            <!-- action for fail-->
                        <ok to="end"/>
                        <error to="end"/>
                    </action>
                    
                    <end name="end"/>
                    

                    【讨论】:

                      【解决方案14】:

                      新的答案,因为用 JobClient.run() 确认的答案在新 API 中不起作用:

                      如果你有两个这样的工作:

                      Configuration conf1 = new Configuration();
                      Job job1 = Job.getInstance(conf1, "a");
                      
                      Configuration conf2 = new Configuration();
                      Job job2 = Job.getInstance(conf2, "b");
                      

                      那么您唯一应该做的就是在创建“job2”之前添加以下行:

                      job1.waitForCompletion(true);
                      

                      【讨论】:

                        猜你喜欢
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        相关资源
                        最近更新 更多