1,整个Flink的Job启动是通过在Driver端通过用户的Envirement的execute()方法将用户的算子转化成StreamGraph
2,然后得到JobGraph通过远程RPC将这个JobGraph提交到JobManager对应的接口
3,JobManager转化成executionGraph.deploy(),然后生成TDD发给TaskManager,然后整个Job就启动起来了
这里来看一下Driver端的实现从用户的Envirement.execute()方法作为入口:
--->>>
继续点进去方法:
--->>>
继续点进去:
private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
ClassLoader envClassLoader,
ExecutionConfig executionConfig,
List<URL> jarFiles,
String host,
int port,
Configuration clientConfiguration,
List<URL> globalClasspaths,
SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, envClassLoader);
Configuration configuration = new Configuration();
configuration.addAll(clientConfiguration);
configuration.setString(JobManagerOptions.ADDRESS, host);
configuration.setInteger(JobManagerOptions.PORT, port);
configuration.setInteger(RestOptions.PORT, port);
final ClusterClient<?> client;
try {
client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
}
catch (Exception e) {
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
streamGraph.getJobGraph().getJobID(), e);
}
client.setPrintStatusDuringExecution(executionConfig.isSysoutLoggingEnabled());
if (savepointRestoreSettings == null) {
savepointRestoreSettings = SavepointRestoreSettings.none();
}
try {
//todo 返回的JobExecutionResult对象
return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
.getJobExecutionResult();
}
catch (ProgramInvocationException e) {
throw e;
}
catch (Exception e) {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term,
streamGraph.getJobGraph().getJobID(), e);
}
finally {
try {
client.shutdown();
} catch (Exception e) {
LOG.warn("Could not properly shut down the cluster client.", e);
}
}
}
--->>>
进入client.run()
在这里我们发现了,StreamGraph 转成了 JobGraph
通过查看类StreamGraph 发现他是继承StreamingPlan
package org.apache.flink.streaming.api.graph.StreamGraph;
--->>>
转换的具体方法:
--->>>
再回到上级,我们找到提交job的具体实现:
@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);
if (isDetached()) {
try {
return jobSubmissionResultFuture.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID()));
final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
}
try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
}
}
具体的执行toJobExecutionResult()方法:
看到抛出的异常提示,是不是很熟悉
public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException {
if (applicationStatus == ApplicationStatus.SUCCEEDED) {
return new JobExecutionResult(
jobId,
netRuntime,
AccumulatorHelper.deserializeAccumulators(
accumulatorResults,
classLoader));
} else {
final Throwable cause;
if (serializedThrowable == null) {
cause = null;
} else {
cause = serializedThrowable.deserializeError(classLoader);
}
final JobExecutionException exception;
if (applicationStatus == ApplicationStatus.FAILED) {
exception = new JobExecutionException(jobId, "Job execution failed.", cause);
} else if (applicationStatus == ApplicationStatus.CANCELED) {
exception = new JobCancellationException(jobId, "Job was cancelled.", cause);
} else {
exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause);
}
throw exception;
}
}
顺便看一下JobExecutionResult这个类的组成
主要是jobid ,作业运行时间,一个map类型的 accumulators
: