本篇文章介绍Flink作业调度命令,以及调度参数的解释
flink yarn-cluster 模式
是否使用分离模式1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| -m,–jobmanager 指定提交的jobmanager -yat,–yarnapplicationType 设置yarn应用的类型 -yD <property=value> 使用给定属性的值 -yd,–yarndetached 使用yarn分离模式 -yh,–yarnhelp yarn session的帮助 -yid,–yarnapplicationId 挂到正在运行的yarnsession上 -yj,–yarnjar Flink jar文件的路径 -yjm,–yarnjobManagerMemory jobmanager的内存(单位M) -ynl,–yarnnodeLabel 指定 YARN 应用程序 YARN 节点标签 -ynm,–yarnname 自定义yarn应用名称 -yq,–yarnquery 显示yarn的可用资源 -yqu,–yarnqueue 指定yarn队列 -ys,–yarnslots 指定每个taskmanager的slots数 -yt,–yarnship 在指定目录中传输文件 -ytm,–yarntaskManagerMemory 每个taskmanager的内存 -yz,–yarnzookeeperNamespace 用来创建ha的zk子路径的命名空间 -z,–zookeeperNamespace 用来创建ha的zk子路径的命名空间
|
举个最近项目中实际用到的例子
run \1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| -m yarn-cluster \ -ys ${slot_count} \ -ynm ${yarn_app_name} \ -yD taskmanager.memory.managed.fraction=0.2 \ -yjm ${yarnjobManagerMemory} \ -ytm ${yarntaskManagerMemory} \ -c com.itiaoling.app.ods.Kafka2Doris \ -z ${zookeeperNamespace} \ -d /home/appman/realtime_warehouse/jar/Analysis-1.0.jar \ -spring.profiles.active pro \ -sourceSinkType ${SourceSinkEnum} \ -businessType ${BusinessTypeEnum} \ -tableName ${tableName}
flink run -m yarn-cluster -ys 1 -ynm esc_goods_oc_test-mysql2kafka -yD taskmanager.memory.managed.fraction=0.2 -yjm 1024 -ytm 8192 -c com.itiaoling.app.ods.MySqlToKafka -z mysql2kafka_goods_test -d /home/appman/realtime_warehouse/jar/Analysis-2.2.jar -spring.profiles.active pro -sourceSinkType 1 -businessType 3 -tableName isc_goods_oc1_batch
|
以上运行的jar包为Analysis-2.2.jar,
类为com.itiaoling.app.ods.MySqlToKafka,
给了1个slot
其中为类代码中接受的参数
-spring.profiles.active pro -sourceSinkType 1 -businessType 3 -tableName isc_goods_oc1_batch
taskmanager的内存指定为了8192MB=8G
本地(local)模式,仅开发使用
一般可以使用这种模式进行远程debug
直接在IDEA上运行代码
standalone
- flink run 前台运行
/opt/flink/bin/flink run -p 1 -c com.test.TestLocal ./flink-streaming-report-forms-1.0-SNAPSHOT-jar-with-dependencies.jar
- flink run后台运行
因上面代码是直接print出来,使用后台模式会报错,这边修改代码,直接写入hdfs,再使用后台模式提交
# 通过 -d 表示后台执行 /opt/flink/bin/flink run -p 1 -c com.test.TestLocal -d ./flink-streaming-report-forms-1.0-SNAPSHOT-jar-with-dependencies.jar
flink run命令执行模板:flink run [option]
1 2 3 4 5 6 7 8 9 10 11 12 13
| -c,--class <classname> : 需要指定的main方法的类
-C,--classpath <url> : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file:
-d,--detached : 在后台运行
-p,--parallelism <parallelism> : job需要指定env的并行度,这个一般都需要设置。
-q,--sysoutLogging : 禁止logging输出作为标准输出。
-s,--fromSavepoint <savepointPath> : 基于savepoint保存下来的路径,进行恢复。
-sas,--shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。
|
tips:
flink的slots与yarn 中的container以及cores之间的关系
flink-conf.yaml中有关slots的配置
taskmanager.numberOfTaskSlots: 2 每个taskmanager中的slots数量,在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。
在yarn中提交一个flink任务,container数量计算方式如下:
container.num = taskmanager.num = ( parallelism.default / taskmanager.numberOfTaskSlots
举个例子:
parallelism.default = 2 ,taskmanager.numberOfTaskSlots = 1
提交作业后,还需要1个JobManager,所以总共需要
container为3 = 1(AppMaster + JobManger) +2 (parallelism.default / taskmanager.numberOfTaskSlots )
Flink 中每一个 worker(也就是 TaskManager)都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)