我是靠谱客的博主 奋斗奇异果,这篇文章主要介绍Flink源码解析系列--StreamExecutionEnvironment类,现在分享给大家,希望可以做个参考。

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  • 获取一个执行环境(execution environment);
  • 加载/创建初始数据;
  • 指定数据相关的转换;
  • 指定计算结果的存储位置;
  • 触发程序执行。

StreamExecutionEnvironment 是所有 Flink 程序的基础。

创建

你可以使用 StreamExecutionEnvironment 的如下静态方法获取 StreamExecutionEnvironment:

复制代码
1
2
3
4
5
6
getExecutionEnvironment(); createLocalEnvironment(); createRemoteEnvironment(String host, int port, String... jarFiles);

通常,你只需要使用 getExecutionEnvironment() 即可,因为该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行你的程序。

在这里插入图片描述
跟一下 getExecutionEnvironment() 方法:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static StreamExecutionEnvironment getExecutionEnvironment() { return getExecutionEnvironment(new Configuration()); } public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) { // 首先检查当前上下文是否存在可用的 EnvironmentFactory return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) // 若当前上下文存在可用的 EnvironmentFactory,则基于该工厂类创建 ExecutionEnvironment .map(factory -> factory.createExecutionEnvironment(configuration)) // 若工厂类未能创建 ExecutionEnvironment ,则调用 createLocalEnvironment(configuration) 方法创建 LocalStreamEnvironment .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration)); } public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) { // 会判断是否有设置默认并行度 if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) { // 若有设置,则基于配置中的并行度创建 LocalStreamEnvironment return new LocalStreamEnvironment(configuration); } else { // 否则将基于 defaultLocalParallelism 创建 LocalStreamEnvironment // 其中,defaultLocalParallelism 为程序运行节点的核数 Configuration copyOfConfiguration = new Configuration(); copyOfConfiguration.addAll(configuration); copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism); return new LocalStreamEnvironment(copyOfConfiguration); } } private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

数据接入

addSource()

第1种数据接入的方法是 addSource()。

该方法的入参为 SourceFunction 接口的实现类。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { // 若未传入 Source 名称,则默认名称为 Custom Source // 若应用存在多个 Source,建议手动指定各个 Source 的名称 return addSource(function, "Custom Source"); } public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) { return addSource(function, sourceName, null); } public <OUT> DataStreamSource<OUT> addSource( SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { // Boundedness 是标识数据源是否无界的枚举 // CONTINUOUS_UNBOUNDED 代表是连续无界的数据源 // BOUNDED 代表是有界的数据源 return addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED); } private <OUT> DataStreamSource<OUT> addSource( final SourceFunction<OUT> function, final String sourceName, @Nullable final TypeInformation<OUT> typeInfo, final Boundedness boundedness) { checkNotNull(function); checkNotNull(sourceName); checkNotNull(boundedness); // 会根据你传入的 SourceFunction 解析出源数据类型 TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo); // 判断是否为 ParallelSourceFunction boolean isParallel = function instanceof ParallelSourceFunction; // 此处会检查传入的 SourceFunction 符合规范 clean(function); // 没啥问题,就将该 SourceFunction 转化为 StreamSource final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); return new DataStreamSource<>( this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness); }

createInput()

第2种数据接入的方法是 createInput()。

该方法的入参为 InputFormat 接口的实现类。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) { // TypeExtractor.getInputFormatTypes(inputFormat) 提取 Source 的类型 return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); } @PublicEvolving public <OUT> DataStreamSource<OUT> createInput( InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) { DataStreamSource<OUT> source; // 判断是否为文件读取的 InputFormat if (inputFormat instanceof FileInputFormat) { @SuppressWarnings("unchecked") FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; source = createFileInput( format, typeInfo, "Custom File source", FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, "Custom Source"); } return source; }

当检测到传入的 InputFormat 为 FileInputFormat 时,会走文件读取的方法,即 createFileInput。

createFileInput 的参数中有1个 FileProcessingMode:

复制代码
1
2
3
4
5
6
7
8
9
10
@PublicEvolving public enum FileProcessingMode { // 处理当前目录下现存的文件内容(一次性读取) PROCESS_ONCE, // 周期性扫描目录下的文件以读取新数据(持续性读取) PROCESS_CONTINUOUSLY }

继续跟一下 createFileInput 方法:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private <OUT> DataStreamSource<OUT> createFileInput( FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval) { Preconditions.checkNotNull(inputFormat, "Unspecified file input format."); Preconditions.checkNotNull(typeInfo, "Unspecified output type information."); Preconditions.checkNotNull(sourceName, "Unspecified name for the source."); Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode."); Preconditions.checkArgument( monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) || interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, "The path monitoring interval cannot be less than " + ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms."); // 创建1个周期性扫描目录文件内容的 Function ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>( inputFormat, monitoringMode, getParallelism(), interval); // 定义文件读取工厂类 ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory = new ContinuousFileReaderOperatorFactory<>(inputFormat); // 此处,会根据传入的 FileProcessingMode 来生成 Boundedness // FileProcessingMode.PROCESS_ONCE 对应 Boundedness.BOUNDED 有界 // FileProcessingMode.PROCESS_CONTINUOUSLY 对应 Boundedness.CONTINUOUS_UNBOUNDED 无界 final Boundedness boundedness = monitoringMode == FileProcessingMode.PROCESS_ONCE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; // 最后调用的还是 addSource() 方法 // SingleOutputStreamOperator 是 DataStreamSource 的父类 SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName, null, boundedness) .transform("Split Reader: " + sourceName, typeInfo, factory); return new DataStreamSource<>(source); }

非文件的 InputFormat 的话,会走另外1个 createInput 方法:

复制代码
1
2
3
4
5
6
7
8
9
private <OUT> DataStreamSource<OUT> createInput( InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) { // 将传入的 InputFormat 转化为 InputFormatSourceFunction InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo); // 最后调用的还是 addSource() 方法 return addSource(function, sourceName, typeInfo); }

InputFormatSourceFunction 为 SourceFunction 接口的实现类,其继承链路如下:

复制代码
1
2
SourceFunction-->ParallelSourceFunction-->RichParallelSourceFunction-->InputFormatSourceFunction

所以,createInput() 本质上调用的还是 addSource() 方法。

fromSource()

第3种数据接入的方法是 fromSource()。

该方法的入参为 Source 接口的实现类。

Flink 1.12 及以后,社区基于 FLIP-27 的改进计划,实现了1种新的 Source 架构。

如果你对 Flink 新的 Source 架构不熟悉的话,可以参阅我之前写好的1篇博客:

Flink进阶系列–FLIP-27新的Source架构

如果你想基于新的 Source 接入数据,则需要通过 StreamExecutionEnvironment.fromSource()。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 第1个参数为 Source 接口的实现类 // 第2个参数为水印生成策略 // 第3个参数为 Source 名称 @PublicEvolving public <OUT> DataStreamSource<OUT> fromSource( Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName) { return fromSource(source, timestampsAndWatermarks, sourceName, null); } @Experimental public <OUT> DataStreamSource<OUT> fromSource( Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo) { // 基于传入的 Source 提取出数据源的类型 final TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo); return new DataStreamSource<>( this, checkNotNull(source, "source"), checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"), checkNotNull(resolvedTypeInfo), checkNotNull(sourceName)); }

任务执行

一旦指定了完整的程序,需要调用 StreamExecutionEnvironment 的 execute() 方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在你的本地机器上触发,或将你的程序提交到某个集群上执行。

execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。

如果不想等待作业完成,可以通过调用 StreamExecutionEnvironment 的 executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,你可以通过它与刚刚提交的作业进行通信。如下是使用 executeAsync() 实现 execute() 语义的示例。

复制代码
1
2
3
4
final JobClient jobClient = env.executeAsync(); final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

关于程序执行的最后一部分对于理解何时以及如何执行 Flink 算子是至关重要的。所有 Flink 程序都是延迟执行的:当程序的 main 方法被执行时,数据加载和转换不会直接发生。相反,每个算子都被创建并添加到 dataflow 形成的有向图。当执行被执行环境的 execute() 方法显示地触发时,这些算子才会真正执行。程序是在本地执行还是在集群上执行取决于执行环境的类型。

紧接着看一下源码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public JobExecutionResult execute() throws Exception { // getStreamGraph() 方法用于获取任务的 StreamGraph return execute(getStreamGraph()); } @Internal public StreamGraph getStreamGraph() { return getStreamGraph(true); } @Internal public StreamGraph getStreamGraph(boolean clearTransformations) { final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate(); if (clearTransformations) { transformations.clear(); } return streamGraph; } private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) { if (transformations.size() <= 0) { throw new IllegalStateException( "No operators defined in streaming topology. Cannot execute."); } // We copy the transformation so that newly added transformations cannot intervene with the // stream graph generation. return new StreamGraphGenerator( new ArrayList<>(transformations), config, checkpointCfg, configuration) .setStateBackend(defaultStateBackend) .setChangelogStateBackendEnabled(changelogStateBackendEnabled) .setSavepointDir(defaultSavepointDirectory) .setChaining(isChainingEnabled) .setUserArtifacts(cacheFile) .setTimeCharacteristic(timeCharacteristic) .setDefaultBufferTimeout(bufferTimeout) .setSlotSharingGroupResource(slotSharingGroupResources); }

可以看出,execute() 提交的是任务的 StreamGraph,DataStreamEnvironment 持有整个任务的 transformations 列表,然后再基于 StreamGraphGenerator 类将 transformations 列表转化为 StreamGraph。

StreamGraphGenerator 类源码后面我们专门出1篇博客详细讲解,此处暂时不做讲解。

execute() 本质上调用的是 execute(StreamGraph streamGraph)。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Internal public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { // 最终还是调用的 executeAsync() 方法 final JobClient jobClient = executeAsync(streamGraph); try { final JobExecutionResult jobExecutionResult; // 如果部署配置为 DeploymentOptions.ATTACHED 连接模式 // 此时,客户端需要等待任务执行完毕,然后调用 getJobExecutionResult().get() 方法获取执行结果 if (configuration.getBoolean(DeploymentOptions.ATTACHED)) { // jobClient.getJobExecutionResult() 返回的是1个 CompletableFuture<JobExecutionResult> jobExecutionResult = jobClient.getJobExecutionResult().get(); // 若部署模式为非连接模式,则客户端无需关注执行结果,异步提交任务之后,直接构造1个 DetachedJobExecutionResult 返回即可 } else { jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID()); } // 获取到任务执行结果之后,依次执行各个任务监听器的 onJobExecuted() 方法 jobListeners.forEach( jobListener -> jobListener.onJobExecuted(jobExecutionResult, null)); return jobExecutionResult; } catch (Throwable t) { Throwable strippedException = ExceptionUtils.stripExecutionException(t); jobListeners.forEach( jobListener -> { jobListener.onJobExecuted(null, strippedException); }); ExceptionUtils.rethrowException(strippedException); // never reached, only make javac happy return null; } }

execute() 最终还是通过 executeAsync() 方法异步提交的任务。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Internal public JobClient executeAsync(StreamGraph streamGraph) throws Exception { checkNotNull(streamGraph, "StreamGraph cannot be null."); checkNotNull( configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); checkNotNull( executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", configuration.get(DeploymentOptions.TARGET)); // 通过 PipelineExecutorFactory 提交 StreamGraph CompletableFuture<JobClient> jobClientFuture = executorFactory .getExecutor(configuration) .execute(streamGraph, configuration, userClassloader); try { JobClient jobClient = jobClientFuture.get(); // 获取到任务提交结果之后,依次执行各个任务监听器的 onJobSubmitted() 方法 jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); return jobClient; } catch (ExecutionException executionException) { final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException); jobListeners.forEach( jobListener -> jobListener.onJobSubmitted(null, strippedException)); throw new FlinkException( String.format("Failed to execute job '%s'.", streamGraph.getJobName()), strippedException); } }

整个任务 execute() 过程包含2大步,而且均是异步的。

  • 第1步是异步提交任务,提交之后直接返回 CompletableFuture,然后通过 CompletableFuture 的 get() 方法拿到提交结果后,执行各个任务监听器的 onJobSubmitted() 方法;
  • 第2步是异步执行任务,调用之后直接返回 CompletableFuture,然后通过 CompletableFuture 的 get() 方法拿到任务执行结果后,执行各个任务监听器的 onJobExecuted() 方法。

任务监听器 JobListener:

复制代码
1
2
3
4
5
6
7
8
9
10
11
@PublicEvolving public interface JobListener { // 当任务被提交后调用 void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable); // 当任务被执行完后调用 void onJobExecuted( @Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable); }

注册方法为:

复制代码
1
2
3
4
5
6
@PublicEvolving public void registerJobListener(JobListener jobListener) { checkNotNull(jobListener, "JobListener cannot be null"); jobListeners.add(jobListener); }

所以,如果你需要在任务提交之后及任务执行完之后执行某些操作(如日志埋点、回调接口等),可以通过注册 JobListener 的方式轻松实现。

本文到此结束,感谢阅读!

最后

以上就是奋斗奇异果最近收集整理的关于Flink源码解析系列--StreamExecutionEnvironment类的全部内容,更多相关Flink源码解析系列--StreamExecutionEnvironment类内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部