Flink任务提交与架构模型(一)
Flink 任务提交模式
Flink分布式计算框架可以基于多种模式部署,每种部署模式下提交任务都有相应的资源管理方式,例如:Flink可以基于Standalone部署模式、基于Yarn部署模式、基于Kubernetes部署模式运行任务,以上不同的集群部署模式下提交Flink任务会涉及申请资源、各角色交互过程,不同模式申请资源涉及到的角色对象大体相同,下面我们以Flink运行时架构流程为例来总体了解下Flink任务提交后涉及到对象交互流程,以便后续学习不同任务提交模式下任务提交流程。
上图是Flink运行时架构流程,涉及集群启动、任务提交、资源申请分配整个流程,大体步骤如下:
启动Flink集群首先会启动JobManager,Standalone集群模式下同时启动TaskManager,该模式资源也就固定;其他集群部署模式会根据提交任务来动态启动TaskManager;
当在客户端提交任务后,客户端会将任务转换成JobGraph提交给JobManager;
JobManager首先启动Dispatcher用于分发作业,运行Flink WebUI提供作业执行信息;
Dispatcher启动后会启动JobMaster并将JobGraph提交给JobMaster,JobMaster会将JobGraph转换成可执行的ExecutionGraph。
JobMaster向对应的资源管理器ResourceManager为当前任务申请Slot资源;
在Standalone资源管理器中会直接找到启动的TaskManager来申请Slot资源,如果资源不足,那么任务执行失败;
其他资源管理器会启动新的TaskManager,新启动的TaskManager会向ResourceManager进行注册资源,然后ResourceManager再向TaskManager申请Slot资源,如果资源不足会启动新的TaskManager来满足资源;
TaskManager为对应的JobMaster offer Slot资源;
JobMaster将要执行的task发送到对应的TaskManager上执行,TaskManager之间可以进行数据交换。
以上就是Flink任务提交的整体流程信息,在Flink中任务提交还有多种模式,不同的Flink集群部署模式支持的任务提交模式不同,对应的任务执行流程略有不同,向Flink集群中提交任务有三种任务部署模式,分别如下:
会话模式 - Session Mode
单作业模式 - Per-Job Mode(过时)
应用模式 - Application Mode
以上三种任务提交模式的主要区别在于Flink集群的生命周期不同、资源的分配方式不同以及Flink 应用程序的main方法执行位置(Client客户端/JobManager)不同。下面分别进行介绍。
会话模式(Session Mode)
Session模式下我们首先会启动一个集群,保持一个会话,这个会话中通过客户端提交作业,集群启动时所有的资源都已经确定,所以所有的提交的作业会竞争集群中的资源。这种模式适合单个作业规模小、执行时间短的大量作业。

优势:只需要一个集群,所有作业提交之后都运行在这一个集群中,所有任务共享集群资源,每个任务执行完成后就释放资源。
缺点:因为集群资源是共享的,所以资源不够了,提交新的作业就会失败,如果一个作业发生故障导致TaskManager宕机,那么所有的作业都会受到影响。
单作业模式(Per-Job Mode)
为了更好的隔离资源,Per-job模式是每提交一个作业会启动一个集群,集群只为这个作业而生,这种模式下客户端运行应用程序,然后启动集群,作业被提交给JobManager,进而分发给TaskManager执行,作业执行完成之后集群就会关闭,所有资源也会释放。

优势:这种模式下每个作业都有自己的JobManager管理,独享当下这个集群的资源,就算作业发生故障,对应的TaskManager宕机也不影响其他作业。如果一个Application有多个job组成,那么每个job都有自己独立的集群。
缺点:每个作业都在客户端向集群JobManager提交,如果一个时间点大量提交Flink作业会造成客户端占用大量的网络带宽,会加重客户端所在节点的资源消耗。
注意:Per-Job 模式目前只有yarn支持,Per-job模式在Flink1.15中已经被弃用,后续版本可能会完全剔除,替代的是Application模式,主要原因就是Application模式把main方法的初始化放到了集群组件的JobManager中,这样对于客户端来说从性能上有很大优化。
应用模式(Application Mode)
Session 模式和Pre-Job模式都是在客户端将作业提交给JobManager,这种方式需要占用大量的网络带宽下载依赖关系并将二进制包发送给JobManager,此外,我们往往提交多个Flink 作业都是在同一个客户端节点,这样更加剧了客户端所在节点的资源消耗,为了降低客户端这种资源消耗,我们可以使用Application Mode。
Application模式与Per-job类似,只是不需要客户端,每个Application提交之后就会启动一个JobManager,也就是创建一个集群,这个JobManager只为执行这一个Flink Application而存在,Application中的多个job都会共用该集群,Application执行结束之后JobManager也就关闭了。这种模式下一个Application会动态创建自己的专属集群(JobManager),所有任务共享该集群,不同Application之间是完全隔离的,在生产环境中建议使用Application模式提交任务。
以上三种Flink任务部署方式生产环境中优先选择Application模式,三者区别总结如下:
Session 模式是先有Flink集群后再提交任务,任务在客户端提交运行,提交的多个作业共享Flink集群;
Per-Job模式和Application模式都是提交Flink任务后创建集群;
Per-Job模式通过客户端提交Flink任务,每个Flink任务对应一个Flink集群,每个任务有很好的资源隔离性;
Application模式是在JobManager上执行main方法,为每个Flink的Application创建一个Flink集群,如果该Application有多个任务,这些Flink任务共享一个集群。
Flink不同的集群部署模式支持不同的任务提交方式,后续小结重点介绍Standalone资源管理和Yarn资源管理任务提交模式的支持。
Flink On Standalone任务提交
Flink On Standalone 即Flink任务运行在Standalone集群中,Standlone集群部署时采用Session模式来构建集群,即:首先构建一个Flink集群,Flink集群资源就固定了,所有提交到该集群的Flink作业都运行在这一个集群中,如果集群中提交的任务多资源不够时,需要手动增加节点,所以Flink 基于Standalone运行任务一般用在开发测试或者企业实时业务较少的场景下。
Flink On Standalone 任务提交支持Session会话模式和Application应用模式,不支持Per-Job单作业模式。下面介绍基于Standalone 的Session会话模式和Application应用模式任务提交命令和原理,演示两类任务提交模式的代码还是以上一章节中读取Socket 数据进行实时WordCount统计代码为例,代码如下:
/**
* 读取Socket数据进行实时WordCount统计
*/
public class SocketWordCount {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取Socket数据
DataStreamSource<String> ds = env.socketTextStream("node5", 9999);
//3.准备K,V格式数据
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
String[] words = line.split(",");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
//4.聚合打印结果
tupleDS.keyBy(tp -> tp.f0).sum(1).print();
//5.execute触发执行
env.execute();
}
}
将以上代码进行打包,名称为"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node5节点上启动socket服务(nc -lk 9999)。
Standalone Session模式
任务提交命令


再次按照以上命令提交Flink任务可以看到集群中会有2个任务,说明Standalone Session模式下提交的所有Flink任务共享集群资源,如下:
以上提交Flink流任务的名称默认为"Flink Streaming Job",也可以通过参数"pipeline.name"来自定义指定Job 名称,提交命令如下:
```
[root@node4 bin]# ./flink run -m node1:8081 -d -Dpipeline.name=socket-wc -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
```
提交之后,可以看到页面中有三个任务,最后一个任务提交的名称改成了自定义任务名称。
任务提交流程
Standalone Session模式提交任务中首先需要创建Flink集群,集群创建启动的同时Dispatcher、JobMaster、ResourceManager对象一并创建、TaskManager也一并启动,TaskManager会向集群ResourceManager汇报Slot信息,Flink集群资源也就确定了。Standalone Session模式提交任务流程如下:
* 在客户端提交Flink任务,客户端会将任务转换成JobGraph提交给JobManager。
* Dispatcher将提交任务提交给JobMaster。
* JobMaster向ResourceManager申请Slot资源。
* ResourceManager会在对应的TaskManager上划分Slot资源。
* TaskManager向JobMaster offer Slot资源。
* JobMaster将任务对应的task发送到TaskManager上执行。
Standalone Application模式
任务提交命令
Standalone Application模式中不会预先创建Flink集群,在提交Flink 任务的同时会创建JobManager,启动Flink集群,然后需要手动启动TaskManager连接该Flink集群,启动的TaskManager会根据$FLINK\_HOME/conf/flink-conf.yaml配置文件中的"jobmanager.rpc.address"配置找JobManager,所以这里选择在node1节点上提交任务并启动JobManager,方便后续其他节点启动TaskManager后连接该节点。Standalone Appliction模式提交任务步骤和命令如下:
1. **准备**Flink jar**包**
在node1节点上将Flink 打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包放在 $FLINK\_HOME/lib目录下。
2. **提交任务,在**node1 ****节点上启动**** JobManager
[root@node1 ~]# cd /software/flink-1.16.0/bin/
#执行如下命令,启动JobManager
[root@node1 bin]# ./standalone-job.sh start --job-classname com.mashibing.flinkjava.code.chapter3.SocketWordCount
执行以上命令后会自动从$FLINK_HOME/lib中扫描所有jar包,执行指定的入口类。命令执行后可以访问对应的Flink WebUI:https://node1:8081,可以看到提交的任务,但是由于还没有执行TaskManager任务无法执行。




任务提交流程
Standalone Application模式提交任务中提交任务的同时会启动JobManager创建Flink集群,但是需要手动启动TaskManager,这样提交的任务才能正常运行,如果提交的任务使用资源多,还可以启动多个TaskManager。Standalone Application模式提交任务流程如下:
在客户端提交Flink任务的同时启动JobManager,客户端会将任务转换成JobGraph提交给JobManager。
Dispatcher会启动JobMaster,Dispatcher将提交任务提交给JobMaster。
JobMaster向ResourceManager申请Slot资源。
手动启动TaskManager,TaskManager会向ResourceManager注册Slot资源
ResourceManager会在对应的TaskManager上划分Slot资源。
TaskManager向JobMaster offer Slot资源。
JobMaster将任务对应的task发送到TaskManager上执行。
Standalone Application模式任务提交流程和Standalone Session模式类似,两者区别主要是Standalone Session模式中启动Flink集群时JobManager、TaskManager、JobMaster会预先启动;Standalone Application模式中提交任务时同时启动集群JobManager、JobMaster,需要手动启动TaskManager。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)