【问题标题】:Running a Hadoop Job From another Java Program从另一个 Java 程序运行 Hadoop 作业
【发布时间】:2012-12-10 06:45:31
【问题描述】:

我正在编写一个程序,它接收映射器/缩减器的源代码,动态编译映射器/缩减器并从中生成一个 JAR 文件。然后它必须在 hadoop 集群上运行这个 JAR 文件。

对于最后一部分,我通过代码动态设置了所有必需的参数。但是,我现在面临的问题是代码在编译时需要已编译的 mapper 和 reducer 类。但是在编译时,我没有这些类,它们稍后会在运行时被接收(例如,通过从远程节点接收到的消息)。如果您有任何关于如何解决此问题的想法/建议,我将不胜感激?

您可以在下面找到我最后一部分的代码,问题在于 job.setMapperClass(Mapper_Class.class) 和 job.setReducerClass(Reducer_Class.class) 需要类(Mapper_Class.class 和 Reducer_Class.class)文件编译时在场:

    private boolean run_Hadoop_Job(String className){
try{
    System.out.println("Starting to run the code on Hadoop...");
    String[] argsTemp = { "project_test/input", "project_test/output" };
    // create a configuration
    Configuration conf = new Configuration();
    conf.set("fs.default.name", "hdfs://localhost:54310");
    conf.set("mapred.job.tracker", "localhost:54311");
    conf.set("mapred.jar", jar_Output_Folder+ java.io.File.separator 
                            + className+".jar");
    conf.set("mapreduce.map.class", "Mapper_Reducer_Classes$Mapper_Class.class");
    conf.set("mapreduce.reduce.class", "Mapper_Reducer_Classes$Reducer_Class.class");
    // create a new job based on the configuration
    Job job = new Job(conf, "Hadoop Example for dynamically and programmatically compiling-running a job");
    job.setJarByClass(Platform.class);
    //job.setMapperClass(Mapper_Class.class);
    //job.setReducerClass(Reducer_Class.class);

    // key/value of your reducer output
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(argsTemp[0]));
    // this deletes possible output paths to prevent job failures
    FileSystem fs = FileSystem.get(conf);
    Path out = new Path(argsTemp[1]);
    fs.delete(out, true);
    // finally set the empty out path
    FileOutputFormat.setOutputPath(job, new Path(argsTemp[1]));

    //job.submit();
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    System.out.println("Job Finished!");        
} catch (Exception e) { return false; }
return true;
}

已修改:所以我修改了代码以使用 conf.set("mapreduce.map.class, "my mapper.class") 指定映射器和化简器。现在代码编译正确,但执行时会抛出以下内容错误:

ec 2012 年 2 月 24 日上午 6:49:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:任务 ID:尝试_201212240511_0006_m_000001_2,状态:失败 java.lang.RuntimeException: java.lang.ClassNotFoundException: Mapper_Reducer_Classes$Mapper_Class.class 在 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:809) 在 org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:157) 在 org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:569) 在 org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) 在 org.apache.hadoop.mapred.Child.main(Child.java:170)

【问题讨论】:

    标签: java hadoop mapreduce


    【解决方案1】:

    如果你在编译时没有它们,那么直接在配置中设置名称,如下所示:

    conf.set("mapreduce.map.class", "org.what.ever.ClassName");
    conf.set("mapreduce.reduce.class", "org.what.ever.ClassName");
    

    【讨论】:

    • 您必须将Hadoop jar 添加到名为tmpjars 的属性中。所以它会像这样工作:conf.set("tmpjars", "/usr/local/hadoop/hadoop-core.jar,/usr/local/hadoop/hadoop-example.jar)。 jar 路径必须用逗号分隔。请注意,这非常 hacky,您必须注意这些 jars 实际上存在于客户端计算机上(以便 Hadoop 将其复制到 HDFS 并将其下载到 tasktrackers)。
    • 感谢托马斯。我想出了这部分,我的代码现在可以正确编译。但是在执行过程中会抛出一些错误。我修改了我最初的帖子以反映这一点。有什么想法吗?
    • 您是否明确将包含映射器的 jar 添加到 tmpjars 中?
    • 我也试过了。这是我尝试过的选项:1)我在调用java(java -cp requierd_JARs)时指定了所需的Jars,而没有指定conf.set(“tmpjars”,...)。这行得通,作业被提交给 Hadoop,它的事件显示“INFO:map 0% reduce 0%”,但它突然抛出“java.lang.ClassNotFoundException: Mapper_Reducer_Classes$Mapper_Class.class”。 org/apache/hadoop/conf/Configuration"
    • 2)如果我指定所需的 JAR 并使用 conf.set("tmpjars",...) 它会在执行“System.exit(job.waitForCompletion( true) ? 0 : 1);": "java.io.FileNotFoundException: 文件不存在:/Users/me/My_Software/hadoop-0.20.2/hadoop-0.20.2-core.jar"。我仔细检查了该文件确实存在于我的系统上。我认为主要原因是它会检查 HDFS 上的路径。
    【解决方案2】:

    问题是 TaskTracker 看不到本地 jRE 中的类。

    我是这样想的(Maven项目);

    首先,将此插件添加到 pom.xml,它将构建您的应用程序 jar 文件,包括所有依赖项 jar,

     <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <finalName>sample</finalName>
                    <!-- 
                    <finalName>uber-${artifactId}-${version}</finalName>
                    -->
                </configuration>
            </plugin>
        </plugins>
      </build>
    

    在java源代码中,添加这些行,它将包含您在pom.xml中通过上面的标签构建到target/sample.jar的sample.jar。

          Configuration config = new Configuration();
          config.set("fs.default.name", "hdfs://ip:port");
          config.set("mapred.job.tracker", "hdfs://ip:port");
    
          JobConf job = new JobConf(config);
          job.setJar("target/sample.jar");
    

    这样,您的任务跟踪器可以引用您编写的类,并且不会发生 ClassNotFoundException。

    【讨论】:

    • 这是最好的答案。您可能不想使用包含 hadoop 作业所需的所有内容的阴影 jar,并将所有这些内容保留在外部 java 程序的类路径中。可能存在 jar 冲突或其他问题。通过路径引用阴影 jar 允许它从外部程序中抽象出来,并通过内置 API 发送到 hadoop 集群。您可以构建一个由外部程序使用的不同 jar,其中仅包含该程序所需的特定依赖项。
    【解决方案3】:

    您只需要对要动态创建的类的 Class 对象的引用。使用Class.for name("foo.Mapper") 而不是foo.Mapper.class

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-12-13
      • 1970-01-01
      • 2019-03-26
      • 2016-03-01
      • 2013-02-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多