1,整个Flink的Job启动是通过在Driver端通过用户的Envirement的execute()方法将用户的算子转化成StreamGraph

2,然后得到JobGraph通过远程RPC将这个JobGraph提交到JobManager对应的接口

3,JobManager转化成executionGraph.deploy(),然后生成TDD发给TaskManager,然后整个Job就启动起来了

这里来看一下Driver端的实现从用户的Envirement.execute()方法作为入口:

 

Flink 1.9源码学习04 ----Job启动Driver端

--->>>

继续点进去方法:

Flink 1.9源码学习04 ----Job启动Driver端

--->>>

继续点进去:

private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
   ClassLoader envClassLoader,
   ExecutionConfig executionConfig,
   List<URL> jarFiles,
   String host,
   int port,
   Configuration clientConfiguration,
   List<URL> globalClasspaths,
   SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
   if (LOG.isInfoEnabled()) {
      LOG.info("Running remotely at {}:{}", host, port);
   }

   ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, envClassLoader);

   Configuration configuration = new Configuration();
   configuration.addAll(clientConfiguration);

   configuration.setString(JobManagerOptions.ADDRESS, host);
   configuration.setInteger(JobManagerOptions.PORT, port);

   configuration.setInteger(RestOptions.PORT, port);

   final ClusterClient<?> client;
   try {
      client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
   }
   catch (Exception e) {
      throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
         streamGraph.getJobGraph().getJobID(), e);
   }

   client.setPrintStatusDuringExecution(executionConfig.isSysoutLoggingEnabled());

   if (savepointRestoreSettings == null) {
      savepointRestoreSettings = SavepointRestoreSettings.none();
   }

   try {
      //todo 返回的JobExecutionResult对象
      return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
         .getJobExecutionResult();
   }
   catch (ProgramInvocationException e) {
      throw e;
   }
   catch (Exception e) {
      String term = e.getMessage() == null ? "." : (": " + e.getMessage());
      throw new ProgramInvocationException("The program execution failed" + term,
         streamGraph.getJobGraph().getJobID(), e);
   }
   finally {
      try {
         client.shutdown();
      } catch (Exception e) {
         LOG.warn("Could not properly shut down the cluster client.", e);
      }
   }
}

--->>>

进入client.run()

在这里我们发现了,StreamGraph  转成了 JobGraph

Flink 1.9源码学习04 ----Job启动Driver端

通过查看类StreamGraph  发现他是继承StreamingPlan 

package org.apache.flink.streaming.api.graph.StreamGraph;

Flink 1.9源码学习04 ----Job启动Driver端

Flink 1.9源码学习04 ----Job启动Driver端

 

--->>>

转换的具体方法:

Flink 1.9源码学习04 ----Job启动Driver端

 

--->>>

再回到上级,我们找到提交job的具体实现:

@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
   final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);

   if (isDetached()) {
      try {
         return jobSubmissionResultFuture.get();
      } catch (InterruptedException | ExecutionException e) {
         ExceptionUtils.checkInterrupted(e);

         throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
      }
   } else {
      final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
         (JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID()));

      final JobResult jobResult;
      try {
         jobResult = jobResultFuture.get();
      } catch (InterruptedException | ExecutionException e) {
         ExceptionUtils.checkInterrupted(e);

         throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
      }

      try {
         return jobResult.toJobExecutionResult(classLoader);
      } catch (JobExecutionException e) {
         throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
      } catch (IOException | ClassNotFoundException e) {
         throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
      }
   }
}

具体的执行toJobExecutionResult()方法:

看到抛出的异常提示,是不是很熟悉

public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException {
   if (applicationStatus == ApplicationStatus.SUCCEEDED) {
      return new JobExecutionResult(
         jobId,
         netRuntime,
         AccumulatorHelper.deserializeAccumulators(
            accumulatorResults,
            classLoader));
   } else {
      final Throwable cause;

      if (serializedThrowable == null) {
         cause = null;
      } else {
         cause = serializedThrowable.deserializeError(classLoader);
      }

      final JobExecutionException exception;

      if (applicationStatus == ApplicationStatus.FAILED) {
         exception = new JobExecutionException(jobId, "Job execution failed.", cause);
      } else if (applicationStatus == ApplicationStatus.CANCELED) {
         exception = new JobCancellationException(jobId, "Job was cancelled.", cause);
      } else {
         exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause);
      }

      throw exception;
   }
}

顺便看一下JobExecutionResult这个类的组成

主要是jobid ,作业运行时间,一个map类型的 accumulators

Flink 1.9源码学习04 ----Job启动Driver端 

相关文章:

  • 2021-10-31
  • 2021-12-29
  • 2022-01-09
  • 2021-07-15
  • 2021-08-12
  • 2022-01-21
  • 2021-08-15
  • 2021-06-06
猜你喜欢
  • 2021-07-06
  • 2021-08-04
  • 2021-10-11
  • 2021-09-02
  • 2021-12-10
  • 2021-06-03
  • 2021-09-02
相关资源
相似解决方案