Flink作业调度参数详解

本篇文章介绍Flink作业调度命令,以及调度参数的解释

是否使用分离模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-m,–jobmanager 指定提交的jobmanager
-yat,–yarnapplicationType 设置yarn应用的类型
-yD <property=value> 使用给定属性的值
-yd,–yarndetached 使用yarn分离模式
-yh,–yarnhelp yarn session的帮助
-yid,–yarnapplicationId 挂到正在运行的yarnsession上
-yj,–yarnjar Flink jar文件的路径
-yjm,–yarnjobManagerMemory jobmanager的内存(单位M)
-ynl,–yarnnodeLabel 指定 YARN 应用程序 YARN 节点标签
-ynm,–yarnname 自定义yarn应用名称
-yq,–yarnquery 显示yarn的可用资源
-yqu,–yarnqueue 指定yarn队列
-ys,–yarnslots 指定每个taskmanager的slots数
-yt,–yarnship 在指定目录中传输文件
-ytm,–yarntaskManagerMemory 每个taskmanager的内存
-yz,–yarnzookeeperNamespace 用来创建ha的zk子路径的命名空间
-z,–zookeeperNamespace 用来创建ha的zk子路径的命名空间

举个最近项目中实际用到的例子

run \
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-m yarn-cluster \
-ys ${slot_count} \
-ynm ${yarn_app_name} \
-yD taskmanager.memory.managed.fraction=0.2 \
-yjm ${yarnjobManagerMemory} \
-ytm ${yarntaskManagerMemory} \
-c com.itiaoling.app.ods.Kafka2Doris \
-z ${zookeeperNamespace} \
-d /home/appman/realtime_warehouse/jar/Analysis-1.0.jar \
-spring.profiles.active pro \
-sourceSinkType ${SourceSinkEnum} \
-businessType ${BusinessTypeEnum} \
-tableName ${tableName}

flink run -m yarn-cluster -ys 1 -ynm esc_goods_oc_test-mysql2kafka -yD taskmanager.memory.managed.fraction=0.2 -yjm 1024 -ytm 8192 -c com.itiaoling.app.ods.MySqlToKafka -z mysql2kafka_goods_test -d /home/appman/realtime_warehouse/jar/Analysis-2.2.jar -spring.profiles.active pro -sourceSinkType 1 -businessType 3 -tableName isc_goods_oc1_batch

以上运行的jar包为Analysis-2.2.jar,

类为com.itiaoling.app.ods.MySqlToKafka,

给了1个slot

其中为类代码中接受的参数

-spring.profiles.active pro -sourceSinkType 1 -businessType 3 -tableName isc_goods_oc1_batch

taskmanager的内存指定为了8192MB=8G

本地(local)模式,仅开发使用

一般可以使用这种模式进行远程debug

直接在IDEA上运行代码

standalone

  1. flink run 前台运行

/opt/flink/bin/flink run -p 1 -c com.test.TestLocal ./flink-streaming-report-forms-1.0-SNAPSHOT-jar-with-dependencies.jar

  1. flink run后台运行

因上面代码是直接print出来,使用后台模式会报错,这边修改代码,直接写入hdfs,再使用后台模式提交

# 通过 -d 表示后台执行 /opt/flink/bin/flink run -p 1 -c com.test.TestLocal -d ./flink-streaming-report-forms-1.0-SNAPSHOT-jar-with-dependencies.jar

flink run命令执行模板:flink run [option]

1
2
3
4
5
6
7
8
9
10
11
12
13
-c,--class <classname> : 需要指定的main方法的类

-C,--classpath <url> : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file://)

-d,--detached : 在后台运行

-p,--parallelism <parallelism> : job需要指定env的并行度,这个一般都需要设置。

-q,--sysoutLogging : 禁止logging输出作为标准输出。

-s,--fromSavepoint <savepointPath> : 基于savepoint保存下来的路径,进行恢复。

-sas,--shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。

tips:

flink的slots与yarn 中的container以及cores之间的关系

flink-conf.yaml中有关slots的配置

taskmanager.numberOfTaskSlots: 2 每个taskmanager中的slots数量,在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。

在yarn中提交一个flink任务,container数量计算方式如下:

container.num = taskmanager.num = ( parallelism.default / taskmanager.numberOfTaskSlots

举个例子:

parallelism.default = 2 ,taskmanager.numberOfTaskSlots = 1

提交作业后,还需要1个JobManager,所以总共需要

container为3 = 1(AppMaster + JobManger) +2 (parallelism.default / taskmanager.numberOfTaskSlots )

Flink 中每一个 worker(也就是 TaskManager)都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)


觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

Flink作业调度参数详解
http://yuting0907.github.io/2022/05/26/Flink作业调度参数详解/
作者
Echo Yu
发布于
2022年5月26日
许可协议