Flink作业故障重启
flink是高可用大数据计算框架,在程序中断后支持断点续传
这里人为kill掉程序,测试断点续传
kill部署 在yarn上的Flink作业代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #!/bin/bash
yarn application --list | grep $1 > /home/appman/realtime_warehouse/flink-job.txt
cat /app/tmp/flink-job.txt | while read line
do
if [[ $line == *"Apache Flink"* ]]
then
array=(${line//'\t'/ })
application_id=${array[0]}
yarn application --kill $application_id
hadoop fs -rm -r -skipTrash /user/appman/.flink/$application_id
rm -f /app/tmp/flink-job.txt
fi
done
|
在hdfs上找到checkpoint目录,找到最近可用的chk目录,点进去有_metadata就可以恢复程序
flink从checkpoint启动命令如下:
1 2
| flink run -m yarn-cluster -s hdfs:///ns/flink/checkpoints/kafka-to-doris-esc-buyer/033b26776f576c08f74b4b5368f5ae24/chk-825377 -ys 1 -ynm esc-buyer-hk-kafka2doris -yD taskmanager.memory.managed.fraction=0.1 -yjm 1024 -ytm 2048 -c com.itiaoling.app.ods.Kafka2Doris -z esc-buyer-hk-kafka2doris -d /home/appman/realtime_warehouse/jar/Analysis-2.4.jar -spring.profiles.active pro -sourceSinkType 2 -businessType 3 -tableName buyer_kafkatodoris_batch
|
当是无效checkpoint,flink作业将一直处在INITIALIZING状态