flink-之各种集群模式的运行时架构

flink-之各种集群模式的运行时架构

NOTE : flink的运行时架构是以flink application或者job为单位的,比如每个flink application都会有自己独立的运行时架构,但是不同的运行时架构可能可以共用公共集群资源。

本文参考文献如下。

flink客户端操作

flink开发环境搭建和应用的配置、部署及运行

flink on yarn\k8s原理剖析与实践

本文呢,主要讲解standalone clusteryarn cluster的运行时架构,在客户端提交flink应用之后集群内部是怎么工作的。

0 flink的日志文件存储内容

  • flink-${user}-standalonesession-${id}-${hostname}.log:代码中的日志输出
  • flink-${user}-standalonesession-${id}-${hostname}.out:进程执行时的stdout输出,比如你的wordcount结果。
  • flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM的GC的日志

1 standalone模式

standalone模式,首先得在配置集群环境之后,预先通过bin/start-cluster.sh启动一个集群时基础环境,此时会在master节点和slave节点上分别启动2种不同的进程如下1.1启动JVM进程

具体的standalone的运行时架构如下:
flink-之各种集群模式的运行时架构

1.1 启动JVM进程

[shufang@shufang101 flink-standalone-1.10.1]$ jpsall
================shufang@shufang101================
# master上启动的JVM进程,其中包含StandaloneResourceManager、StandaloneDispatcher2个子进程
30641 StandaloneSessionClusterEntrypoint  
31043 TaskManagerRunner # worker节点上启动的Taskmanager JVM进程
32281 Jps
================shufang@shufang102================
5037 Jps
4687 TaskManagerRunner  # worker节点上启动的Taskmanager JVM进程
================shufang@shufang103================
4738 TaskManagerRunner  # worker节点上启动的Taskmanager JVM进程
5083 Jps 

1.2 集群启动日志分析

首先看启动的日志如下。
flink-之各种集群模式的运行时架构

上图中的步骤可以很清楚的看到。

  • 1、 首先启动一个dispatcher的webui界面
  • 2、 然后就是启动整个StandaloneDispatcher的服务,此时你可以通过webui界面提交flink的jar包
  • 3、 启动了StandaloneResourceManager,并且接受了taskamanager的注册请求,此时Slots已经被StandaloneResourceManager管理起来了。

注意:standalone模式resourcemanager只能管理并且调度预先启动的taskamanager上的资源,这个是固定的,resourcemanager并不能再去启动新的taskmanager

1.3 提交一个单job的flink应用变化

首先通过以下任意命令启动一个flink的应用。

# 1 这是一个stream的应用
bin/flink run -d examples/streaming/TopSpeedWindowing.jar

# 2 这是一个batch应用
bin/flink run examples/streaming/WordCount.jar

flink-之各种集群模式的运行时架构
flink-之各种集群模式的运行时架构

再提交一个应用之后,上图中的步骤可以很清楚的看到。

  • 1、Dispatcher从客户端接收到一个JobGraph,并且提交job;
  • 2、启动JobMaster,每个单独的job都会有自己的JobMaster;
  • 3、启动一个JobManager,作为该应用中所有job对应jobmaster的管理角色;
  • 4、将JobGraph转化成ExecutionGraph,最终交给Scheduler去调度分配给不同的Taskmanager去执行;
  • 5、执行过程中获取到的Task的状态,该状态为FINISHED;
  • 6、关闭checkpoint协调器;
  • 7、ResourceManager断开与JobManager的连接,并开始关闭JobManager子进程,JobManager最终已经处于关闭状态。

注意:所有的JobManager都是启动在基础资源中的StandaloneSessionClusterEntrypoint中的,JobManager启动后,将Dispatcher、Resourcemanager、JobMaster从逻辑上抽象成自身的组件,在flink-1.13的官方文档中有清晰的解释,JobManager最终作为master,其Resourcemanager管理的TaskManager为Worker

flink-之各种集群模式的运行时架构

2 yarn模式

flink on yarn 模式可以分成多种,如per-job,yarn-session,application模式,基础架构如下。

flink-之各种集群模式的运行时架构

2.1 per-job cluster

per-job集群模式的运行时架构如下。

作业提交方式:

# 以per-job提交一个应用
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

...............................................................................
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager (master) to
                                          which to connect. Use this flag to
                                          connect to a different JobManager than
                                          the one specified in the
                                          configuration.
     -yat,--yarnapplicationType <arg>     Set a custom application type for the
                                          application on YARN
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                          optional unit (default: MB)
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                          application
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                          optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

yarn的web页面查看到的aplication的样例。

flink-之各种集群模式的运行时架构

以下是per-job的运行时架构。
flink-之各种集群模式的运行时架构

  • 1、客户端提交flink-app首先是向Yarn-ResourceManager提交,并且将dataflow graph和jar提交到HDFS上;
  • 2、Yarn-ResourceManager会根据提交信息在NodeManager节点上的Container中启动该flink-app对应的ApplicationMaster(YarnJobClusterEntrypoint);
  • 3、ApplicationMaster会接连启动子进程JobManager,包括其组件Dispatcher、Flink-YarnResourcemanager(flink自己内部的资源管理实现)、JobMaster;
  • 4、内部Dispatcher(上图未画出)会根据Container中的jar包jobGraph和提交信息启动JobMaster,然后Flink-YarnResourcemanager会通过ApplicationMaster向Yarn-ResourceManager申请资源,按需分配启动TaskManager(YarnTaskExecutorRunner),这个启动TaskManager的过程是动态的,可以动态启动与停止;
  • 5、最终还是通过JobManager中的Scheduler组件进行Task的任务分配到对应的Taskmanager中的Slots中去执行;
  • 6、在该job执行完之后,所有的资源都会释放(包括JobManager和TaskManager),进入到yarn的资源管理池

2.2 yarn-session模式

yarn-session模式和standalone模式有异曲同工之妙,需要通过bin/yarn-session [option]启动一个yarn的会话,然后提交命令如下:

bin/yarn-session.sh 
-n(--container) 2  #taskmanager的数量
-s (--slots) 2  #taskmanager的slot数量
-jm 1024 #jobmanager的内存
-tm 1024 #taskmanager的内存
-nm test #出现在yarn界面上的名字-AppName
-d #后台运行
     -at,--applicationType <arg>     设置一个自定义的应用类型,如flink、spark、mr...
     -D <property=value>             设置KV类型的属性,如-Dkey=value
     -d,--detached                   设置一个挂起的会话,只有使用的时候才会占用资源
     -h,--help                       查看yarn-session客户端的使用选项
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  指定需要加入类路径下的jar包
     -jm,--jobManagerMemory <arg>    指定JobManager所运行的容器的内存单元-MB
     -m,--jobmanager <arg>           选择JobManager(Master)的连接地址,如 -m yarn
     -nl,--nodeLabel <arg>           为yarn应用指定节点标签
     -nm,--name <arg>                为提交在yarn上的应用设置一个名字
     -q,--query                      显示可用的yarn资源(memory、cores)
     -qu,--queue <arg>               指定应用运行在yarn的哪个调度队列
     -s,--slots <arg>                设置每个Taskmanager的Slots
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   指定TaskManager所运行的容器的内存单元-MB
     -yd,--yarndetached              与-d的作用一样,目前这个选项已经过时了deprecated
     -z,--zookeeperNamespace <arg>   为高可用的模式在zookeeper中创建的子路径

###################在yarn-session中提交应用##################################
bin/flink run -d examples/streaming/TopSpeedWindowing.jar

具体的Yarn-Session Cluster的运行时架构如下。

flink-之各种集群模式的运行时架构

  • 1、首先客户端向Yarn按照资源申请Yarn-Session,此时只会启动ApplicationMaster,进程名为YarnSessionClusterEntrypoint,包括Dispatcher、Flink-YarnResourcemanager,并不会启动TaskManager;
  • 2、当你提交一个flink应用1,此时会通过dispatcher启动一个JobManager_1,这个JobManager与ApplicationMaster是启动在一个容器中;
  • 3、然后通过Flink-YarnResourceManager向YarnResourceManager申请资源,然后ResourceManager再NodeManager上启动容器,启动对应的TaskManager,此时的Task Manager进程的名字为:YarnTaskExecutorRunner
  • 4、然后为每个Job启动对应的JobMaster,然后将容器中的dataflow graph转换成execution graph,然后交给Scheduler去调度分配给YarnTaskExecutorRunner,并向Flink-YarnResourcemanager注册Slots;
  • 5、当你再提交一个flink应用2,此时会通过公用的dispatcher再启动一个JobManager_2,这个JobManager与ApplicationMaster是启动在一个容器中;
  • 6、然后通过Flink-YarnResourceManager向YarnResourceManager申请资源,然后ResourceManager再NodeManager上启动容器,启动对应的TaskManager,此时的Task Manager进程的名字为:YarnTaskExecutorRunner
  • 7、当flink job执行完或者cancel之后,会将YarnTaskExecutorRunner关闭且释放资源,最终将JobManager_1、JobManager_2也会关闭,最终留下一个常驻的进程YarnSessionClusterEntrypoint,其中只包括(Dispatcher、Flink-YarnResourceManager)。

2.3 什么时候使用什么模式?

job执行完或者cancel之后,会将YarnTaskExecutorRunner关闭且释放资源,最终将JobManager_1、JobManager_2也会关闭,最终留下一个常驻的进程YarnSessionClusterEntrypoint,其中只包括(Dispatcher、Flink-YarnResourceManager)。

2.3 什么时候使用什么模式?

Session 模式和 Per Job 模式的应用场景不一样。Per Job 模式比较适合那种对启动时间不敏感,运行时间较长的任务。Seesion 模式适合短时间运行的任务,一般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。

上一篇:第 15 节 DataStream之source(scala语言)


下一篇:DataStream API(一)