Flink的数据流图的生成主要分为简单执行计划-->StreamGraph的生成-->JobGraph的生成-->ExecutionGraph的生成-->物理执行图。其中前三个(ExecutionGraph的之前都是在client上生成的)。ExectuionGraph是JobGraph的并行版本,是在JobManager(master)端生成的。而物理执行图只是一个抽象的概念,其具体的执行是在多个slave上并行执行的。

原理分析

    Flink数据流图的生成----简单执行计划的生成

  Flink效仿了传统的关系型数据库在运行SQL时生成运行计划并对其进行优化的思路。在具体生成数据流图之前会生成一个运行计划,当程序执行execute方法时,才具体生成数据流图运行任务。

  首先Flink会加载数据源,读取配置文件,获取配置参数parallelism等,为source 的transformation对应的类型是SourceTransformation,opertorName是source,然后进入flatmap,用户重写了内置的flatmap内核函数,按照空格进行划分单词,获取到其各种配制参数,parallelism以及输出的类型封装Tuple2<String,Integer>,以及operatorName是Flat Map,其对应的Transformation类型是OneInputTransformation。然后开始keyby(0),其中0指的是Tuple2<String, Integer>中的String,其意义是按照word进行重分区,其对应的parallelism是4,operatorName是partition,Transformation的类型是PartitionTransformation,输出类型的封装是Tuple2<String, Integer>。接着sum(1),该函数的作用是把相同的key对应的值进行加1操作。其对应的parallelism是4,operatorName是keyed Aggregation,对应的输出类型封装是Tuple2<String, Integer>,Transformation的类型是OneInputTransformation。最后是进行结果输出处理sink,对应的parallelism是4,输出类型的封装是Tuple2<String, Integer>,对应的operatorName是sink,对应的Transformation类型是SinkTransformation。

源码

以WordCount.java为例:

 1 package org.apache.flink.streaming.examples.wordcount;
 2 public class WordCount {
 3     private static  Logger LOG = LoggerFactory.getLogger(WordCount.class);
 4     private static SimpleDateFormat df=new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
 5     public static long time=0;
 6     public static void main(String[] args) throws Exception {
 7         // Checking input parameters
 8         LOG.info("set up the execution environment: start= "+df.format(System.currentTimeMillis()));
 9         final ParameterTool params = ParameterTool.fromArgs(args);
10         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
11         env.getConfig().setGlobalJobParameters(params);
12         DataStream<String> text;
13         if (params.has("input")) {
14             text = env.readTextFile(params.get("input"));
15         } else {
16             text = env.fromElements(WordCountData.WORDS);
17         }
18         DataStream<Tuple2<String, Integer>> counts =
19             text.flatMap(new Tokenizer()).keyBy(0).sum(1);
20         if (params.has("output")) {
21             counts.writeAsText(params.get("output"));
22         } else {
23             System.out.println("Printing result to stdout. Use --output to specify output path.");
24             counts.print();
25         }
26         env.execute("Streaming WordCount");
27     }
28     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
29         private static final long serialVersionUID = 1L;
30         @Override
31         public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
32                 throws Exception {
33             String[] tokens = value.toLowerCase().split("\\W+");
34             for (String token : tokens) {
35                 if (token.length() > 0) {
36                     out.collect(new Tuple2<String, Integer>(token, 1));
37                 }
38             }
39         }
40     }
41 }

  Flink在程序执行时,首先会获取程序需要的执行计划,类似数据的惰性加载,当具体执行execute()函数时,程序才会具体真正执行。首先执行

1 text = env.readTextFile(params.get("input"));

  该函数的作用是加载数据文件,获取数据源,形成source的属性信息,包括source的Transformation类型、并行度、输出类型等。源码如下:

 1 public final <OUT> DataStreamSource<OUT> readTextFile(OUT... data) {
 2         TypeInformation<OUT> typeInfo;
 3         try {
 4             typeInfo = TypeExtractor.getForObject(data[0]);
 5         }
 6         return fromCollection(Arrays.asList(data), typeInfo);
 7 }
 8 
 9 public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
10         FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
11         SourceFunction<OUT> function;
12         try {
13             function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
14         }
15         catch (IOException e) {
16             throw new RuntimeException(e.getMessage(), e);
17         }
18         return addSource(function, "Collection Source", typeInfo).setParallelism(1);
19     }
20 
21     public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
22         boolean isParallel = function instanceof ParallelSourceFunction;
23         clean(function);
24         StreamSource<OUT, ?> sourceOperator;
25         if (function instanceof StoppableFunction) {
26             sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
27         } else {
28             sourceOperator = new StreamSource<>(function);
29         }
30         return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
31     }
32 
33 public DataStreamSource(StreamExecutionEnvironment environment,
34             TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
35             boolean isParallel, String sourceName) {
36         super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));    
37         this.isParallel = isParallel;
38         if (!isParallel) {
39             setParallelism(1);
40         }
41     }
获取source信息

相关文章:

  • 2021-06-06
  • 2021-08-16
  • 2021-11-14
  • 2022-12-23
  • 2022-12-23
  • 2021-08-12
  • 2022-01-14
  • 2022-12-23
猜你喜欢
  • 2021-04-25
  • 2021-09-27
  • 2022-12-23
  • 2021-10-30
相关资源
相似解决方案