这个答案是提问者实际提出的问题。我通常会引用它,但会弃权,因为它太大了。
这是有关如何使用Hadoop
Streaming(当前为 1.0.3)链接两个或多个流式作业的文档。
为了理解将执行链接的最终代码并能够
写任何其他连锁工作需要一些初步但实用的理论。
首先,Hadoop 中的工作是什么? Hadoop 作业是
hadoopJob = Configuration + Execution
在哪里,
配置:使执行成为可能的所有设置。
执行:完成所需任务的一组可执行文件或脚本文件。换句话说,就是我们任务的 map 和 reduce 步骤。
Configuration = hadoopEnvironment + userEnvironment
在哪里,
hadoopEnvironment:是Hadoop通用环境的搭建。这个通用环境是从资源定义的,即位于 $HADOOP_HOME/conf 目录中的 xml 文件。例如,一些资源是 core-site.xml、mapred-site.xml 和 hadoop-site.xml,它们分别定义了 hdfs 临时目录、作业跟踪器和集群节点数。
userEnvrironment :是运行作业时用户指定的参数。在 Hadoop 中,这些参数称为选项。
userEnvironment = genericOptions + streamingOptions
在哪里,
genericOptions :它们是通用的,因为它们对每个流式作业都有吸引力,独立于作业。它们由 GenericOptionsParser 处理。
streamingOptions :它们是特定于工作的,因为它们对某个工作有吸引力。例如,每个作业都有自己的输入和输出目录或文件。它们是从 StreamJob 处理的。
示意图,
hadoopJob
/\
/ \
/ \
/ \
/ \
Configuration Execution
/\ |
/ \ |
/ \ executable or script files
/ \
/ \
/ \
hadoopEnvironment userEnvironment
| /\
| / \
| / \
$HADOOP_HOME/conf / \
/ \
genericOptions streamingOptions
| |
| |
GenericOptionsParser StreamJob
任何人都可以看到,以上所有内容都是一系列配置。其中一部分是
对于集群的管理员(hadoopEnvironment),另一部分是
对于集群的用户(userEnvironment)。总而言之,工作主要是
抽象级别的配置,如果我们暂时忘记执行
部分。
我们的代码应该处理上述所有问题。现在我们准备好编写代码了。
首先,什么是代码级别的 Hadoop 作业?它是一个 jar 文件。每当我们
提交一个作业我们提交一个带有一些命令行参数的 jar 文件。例如
当我们运行单个流式作业时,我们执行命令
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
在哪里,
我们的工作是 hadoop-streaming-1.0.3.jar
带有命令行参数 -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
在这个 jar 里面是我们的类,它会妥善处理所有事情。
所以,我们打开一个新的 java 文件,比如 TestChain.java,
// import everything needed
public class TestChain
{
//code here
public static void main( String[] args) throws Exception
{
//code here
}//end main
}//end TestChain
为了处理hadoopEnvironment,我们的类应该继承类Configured。配置类使我们能够访问 Hadoop 的环境和参数,即访问前面提到的资源。资源是 xml 文件,其中包含名称/值对形式的数据。
展望未来,每个界面或多或少都是外部世界与世界想要完成的任务之间的媒介。也就是说,界面是我们用来完成任务的工具。因此,我们的类是一个工具。为此,我们的类必须实现Tool 接口,该接口声明了一个方法run()。这个方法定义了我们的工具行为,当然当接口实现时。最后,为了使用我们的工具,我们使用类ToolRunner。 ToolRunner,通过 GenericOptionsParser 类,也帮助处理来自 userEnvironment 的 genericOptions。
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
为了完成图片,方法run()也称为驱动程序,设置作业,包括作业的初始化和配置。请注意,我们通过 ToolRunner.run() 方法的第一个参数“new Configuration”委托给 ToolRunner 处理 hadoopEnnvironment。
到目前为止我们做了什么?我们只是设置了我们的工具将在其中运行的环境。
现在我们必须定义我们的工具,即进行链接。
只要每个链式作业都是流式作业,我们就这样创建它们中的每一个。我们使用 StreamJob 类的 StreamJob.createJob(String[] args) 方法来做到这一点。 Strings 的 args 矩阵包含每个作业的“命令行”参数。这些命令行参数是指 userEnvironment 的 streamingOptions(job specific)。此外,这些参数采用参数/值对的形式。例如,如果我们的作业有 in.txt 文件作为输入,/out/ 作为输出目录,m.py 作为映射器,r.py 作为减速器,那么,
String[] example = new String[]
{
"-mapper" , "m.py"
"-reducer" , "r.py"
"-input" , "in.txt"
"-output" , "/out/"
}
你必须注意两件事。首先,“-”是必要的。正是这个小东西将参数与值区分开来。这里,mapper 是一个参数,m.py 是它的值。区别从“-”来理解。其次,如果在参数的左"和"-"之间添加一个空格,则忽略此参数。如果我们有"-mapper",则"-mapper"不被视为参数。StreamJob 解析时,args 矩阵看起来对于成对的参数/值。最后一件事,回想一下,作业大致是一个配置。我们期望这样,StreamJob.creatJob() 应该返回一个配置或类似的东西。实际上 StreamJob.createJob() 返回一个@987654328 @object。简而言之,JobConf 对象是 Hadoop 理解并且当然可以执行的特定 mapreduce 作业的描述。
假设我们要链接三个工作,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
//code here
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
//code here
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
此时我们设置了我们的工具将要运行的环境和
我们定义了它的行为。然而,我们还没有付诸行动。 ToolRunner 不是
足够。 ToolRunner,作为一个整体运行我们的工具。它不运行个人
连锁工作。我们必须这样做。
有两种方法可以做到这一点。第一种是使用JobClient,第二种是使用JobControl。
第一种方式 - JobClient
使用 JobClient,我们将链作业作为一个序列运行,一个作业一个接一个地运行
为每个作业调用 JobClient。运行每个单独作业的方法是
JobClient.runJob(jobtorun) 其中 jobtorun 是一个 JobConf 对象。
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
使用 JobClient 的这种方式的一个优点是作业进度会打印在标准输出上。
JobClient 的一个缺点是它无法处理作业之间的依赖关系。
第二种方式——JobControl
使用 JobControl,所有链式作业都是一组作业的一部分。在这里,每个作业都在该组的框架中执行。这意味着必须首先将每个链作业添加到组中,然后运行该组。该组是一个 FIFO,或者该组中每个作业的执行都遵循 FCFS(先到先服务)模式。使用 JobControl.addJob(jobtoadd) 方法将每个作业添加到组中。
JobControl 可以通过 x.addDependingJob(y) 方法处理依赖关系,其中
工作 x 取决于工作 y。这意味着,在作业 y 完成之前,作业 x 无法运行。
如果工作 x 依赖于工作 y 和 z 并且 z 独立于 y,那么
使用 x.addDependingJob(y) 和 x.addDependingJob(z) 我们可以表达这些
依赖关系。
JobControl 与 JobClient 相矛盾,与 Job 对象“工作”。当我们打电话
例如 x.addDependingJob(y) 方法,x, y 是 Job 对象。同样成立
对于 JobControl.addJob(jobtoadd),jobtoadd 是一个 Job 对象。每个 Job 对象是
从 JobConf 对象创建。回到我们拥有的代码,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
在上面的代码中,请注意 job3 依赖于 job2。如您所见,job3的输入
是job2的输出。这个事实是一个依赖。 job3 一直等到 job2 完成。
到目前为止,我们只是在组中添加了链作业并描述了它们的依赖关系。
我们需要最后一件事来运行这组作业。
蛮力说只调用方法 JobControl.run()。虽然,这
方法有效,这是有问题的。当连锁作业完成后,整个
工作仍然永远运行。一种有效的方法是定义一个新的
从我们的作业执行的线程 已经存在的线程(当作业运行时)。
然后我们可以等到链作业完成后再退出。与此同时
链作业的执行我们可以询问作业执行信息,例如
有多少工作已经完成,或者我们可以找到一个工作是否处于无效状态,并且
就是这个。
这种使用 JobControl 的方式的一个优点是可以处理许多
作业之间可能存在的依赖关系。
JobControl 的一个缺点是作业进度不会打印在
标准输出,它不是直接呈现的。无论工作失败还是
成功,不会打印任何有用的信息。您必须检查 Hadoop 的 Web UI
或在下面的 while 循环中添加一些代码来跟踪作业的状态或任何内容
需要。最后,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
Thread runjobc = new Thread( jobc);
runjobc.start();
while( !jobc.allFinished())
{
//do whatever you want; just wait or ask for job information
}
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
错误
本节讨论可能发生的一些错误。在下面的错误消息中有一个类 OptimizingJoins。这个类只是用来演示各种错误的类,与本次讨论无关。
尝试编译时包不存在。
这是类路径的问题。编译like(以添加hadoop-streaming-1.0.3.jar包为例),
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
并添加任何缺少的包。
java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
总误差为,
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
at OptimizingJoins.run(OptimizingJoins.java:135)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at OptimizingJoins.main(OptimizingJoins.java:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 8 more
这是我们jar文件的manifest文件的问题。当我们编译我们的工作时
上面的方式,一切都很好。 Java 编译器会找到它需要的任何东西。但
当我们通过命令在 Hadoop 中运行我们的作业时
$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain
然后运行我们的 jar 的 JVM 找不到 StreamJob。为了解决这个问题,当我们
创建jar文件,我们在jar中放入一个包含类的清单文件-
StreamJob 的路径。实际上,
MANIFEST.MF
Manifest-Version: 1.0
Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar
Created-By: 1.7.0_07 (Oracle Corporation)
请注意,MANIFEST.MF 文件总是以空行结尾。我们的清单.MF
文件有 4 行,而不是 3 行。然后我们创建 jar 文件,例如,
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class
ERROR streaming.StreamJob:无法识别的选项:-D
出现此错误是因为 StreamJob 无法解析 -D 选项。 StreamJob 可以解析
仅流式传输,特定于作业的选项,-D 是通用选项。
这个问题有两种解决方案。第一个解决方案是使用 -jobconf
选项而不是 -D。第二种解决方案是通过解析 -D 选项
GenericOptionsParser 对象。当然,在第二种解决方案中,您必须删除
StreamJob.createJob() 参数中的 -D 选项。
举个例子,第二种解决方案的“干净”代码实现是,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain
{
public class Job1 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
return 0;
}//end run
}
public class Job2 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
return 0;
}//end run
}
public class Job3 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
}
public static void main( String[] args) throws Exception
{
TestChain tc = new TestChain();
//Domination
String[] j1args = new String[]
{
"-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
"-D", "mapred.text.key.comparator.options=-k1,1" ,
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);
//Cost evaluation
String[] j2rgs = new String[]
{
"-D", "mapred.reduce.tasks=12 " ,
"-D", "mapred.text.key,partitioner.options=-k1,1"
};
// Let ToolRunner handle generic command-line options
int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);
//Minimum Cost
String[] j3args = new String[]
{
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);
System.exit( mres);
}
}//end TestChain
在上面的代码中,我们定义了一个全局类TestChain,它封装了
连锁工作。然后我们定义每个单独的链作业,即我们定义它的运行方法。
每一个链式作业都是一个继承Configured并实现Tool的类。最后,
在 TestChain 的 main 方法中,我们按顺序运行每个作业。请注意,在运行之前
我们定义其通用选项的任何链作业。
编译
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
罐子
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class
错误 security.UserGroupInformation: PriviledgedActionException as:hduser cause:org.apache.hadoop.mapred.InvalidInputException: Input Pattern hdfs://localhost:54310/user/hduser/whateverFile 匹配 0 个文件
当我们使用 JobControl 时会发生此错误。例如,如果一个作业将前一个作业的输出作为输入,那么如果该输入-输出文件不存在,则会发生此错误。 JobControl 以“并行”的方式运行所有独立的作业,而不是像 JobClient 那样一一进行。因此,Jobcontrol 尝试运行输入文件不存在的作业,因此失败。
为了避免这种情况,我们使用 x.addDependingJob(y) 声明这两个作业之间存在依赖关系,作业 x 依赖于作业 y。现在,JobControl 不会尝试在并行相关作业中运行。