flink上线异常日记

问题1:Exceeded checkpoint tolerable failure threshold

Flink任务失败,检查点失效:Exceeded checkpoint tolerable failure threshold.

最近实时平台flink任务频繁失败,报检查点方面的错误,初步排查是因为flinkcdc在同步mysql数据的时候,mysql表存有大json的数据,并且数据量达到2千万,所以导致flink作业在checkpoint的时候会变得异常缓慢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2022-12-04 16:26:46,566 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 670223 of job 61103d713243c4a71befb436fa3f32ee expired before completing.
2022-07-16 06:26:46,571 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1934) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1906) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:96) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1990) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_201]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_201]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_201]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_201]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]

在报Exceeded checkpoint tolerable failure threshold.错误的之前,是先报的是Checkpoint expired before completing.大概意思是检查点在完成前过期了。

原因:

checkpointTimeout 设置的checkpoint超时时间内未完成任务 导致的超时异常

解决方案I:

1.增加超时时间

2.增加机器性能

3.较少数据处理量:source并行度和窗口数据量减少,sink并行度增加

4.优化耗时的算子(数据倾斜)

5.设置可容忍检查点失败次数配置

execution.checkpointing.tolerable-failed-checkpoints 默认是(none)

最后翻看flink官网,通过设置execution.checkpointing.tolerable-failed-checkpoints允许容忍检查点失败个数,解决问题。例如将execution.checkpointing.tolerable-failed-checkpoints设置为3,连续失败3次,continuousFailureCounter会累计到3,作业就会尝试重启。如果中间有一个checkpoint成功了,continuousFailureCounter就会重置为0.

通常是该flink程序运行10min钟后就会报超时异常的错误,导致数据同步不成功。10min钟后查看flink任务web界面,任务正常,但检查点确实失败过一次,也同样是任务启动后10min钟失败的,失败原因和之前一样Checkpoint expired before completing.

解决方案II:

flinkcdc2.2版本,读mysql数据到doris报错

设置execution.checkpointing.tolerable-failed-checkpoints = 3没用,flinkcdc程序在第一个checkpoint点位会一直in-progress,此时可以尝试增大jobmanager内存(-Djobmanager.memory.process.size=1024mb),在增大jobmanager内存后,checkpoint正常,但发现数据还是没有同步到对应的表。

后来同事在cdc的github上找到类似的issue,mysql表会突然insert上亿的数据,flinkcdc同步不进去。

https://github.com/ververica/flink-cdc-connectors/issues/460

后来通过离线batch的方式解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.itiaoling.app.ods;

import com.alibaba.fastjson.JSONObject;
import com.itiaoling.bean.batch.GoodsEbayCompatibilityModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;

public class EscMysql2DorisBatch {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.setString("execution.checkpointing.interval", "10000");
conf.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
conf.setString("state.checkpoints.num-retained", "2");
conf.setString("state.checkpoints.dir", "hdfs:/ns/flink/checkpoints/esc_buyer1_batch");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(10);

EnvironmentSettings Settings = EnvironmentSettings.newInstance().inStreamingMode().build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, Settings);
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tEnv.executeSql(
"CREATE TABLE test_multi_jdbc(
`id` BIGINT COMMENT '',
`dbctime` TIMESTAMP(3) COMMENT '',
`dbutime` TIMESTAMP(3) COMMENT '',
`extension` STRING COMMENT '') COMMENT 'null'
WITH (
'connector' = 'multi-jdbc',
'table-name' = 'test_table.*',
'schema-name' = 'test_database.*',
'username' = '${username}',
'password' = '${password}',
'scan.partition.column' = 'id',
'scan.batch.size' = '100000',
'url' = 'jdbc:mysql://${connection1};jdbc:mysql://${connection2}'
)

Table table = tEnv.sqlQuery("select t.id," +
"t.site," +
"t.category_id," +
"t.esc_category_id," +
"t.model_id," +
"t.version," +
"t.update_type," +
"DATE_FORMAT(t.create_time, 'yyyy-MM-dd HH:mm:ss') as create_time," +
"DATE_FORMAT(t.update_time, 'yyyy-MM-dd HH:mm:ss') as update_time," +
"t.deleted," +
"0 as is_deleted_flg, " +
"1673853468700 as ts_ms " +
"from goods_ebay_compatibility_model t "
);


DataStream<GoodsEbayCompatibilityModel> ordersDataStream = tEnv.toDataStream(table, GoodsEbayCompatibilityModel.class);
SingleOutputStreamOperator<Object> map = ordersDataStream.map(new MapFunction<GoodsEbayCompatibilityModel, Object>() {
@Override
public Object map(GoodsEbayCompatibilityModel goodsEbayCompatibilityModel) throws Exception {
return JSONObject.toJSON(goodsEbayCompatibilityModel).toString();
}
});

// map.print();

Properties sinkPproperties = new Properties();
sinkPproperties.setProperty("format", "json");
sinkPproperties.setProperty("strip_outer_array", "true");

map.addSink(
DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(100000)
.setBatchIntervalMs(20000l)
.setMaxRetries(3)
.setStreamLoadProp(sinkPproperties).build(),
DorisOptions.builder()
.setFenodes("XXX:8030,XXX:8030,XXX:8030")
.setTableIdentifier("ods.ods_esc_buyer_goods_ebay_compatibility_model_bak")
.setUsername("XXX")
.setPassword("XXX")
.build()
));

env.execute("mysql-sink-doris-batch");
}
}

问题2: 同步的上游表有DDL变更会导致flinkcdc作业同步任务出问题

flinkcdc在同步mysql表时是通过读取binlog日志来同步数据,在mysql表的ddl变更后,flinkcdc读取不到binlog的点位导致数据同步出问题。

解决方案:

同步修改ods表结构,重启flink同步任务

问题3: The heartbeat…timed out

flink程序报错如下:

1
2
3
4
5
6
7
8
Caused by: java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_e06_1627962873638_5732_01_000003  timed out.
at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1125)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
... 20 more

原因:此错误是container心跳超时,出现此种错误一般有两种可能:

1、分布式物理机网络失联,这种原因一般情况下failover后作业能正常恢复,如果出现的不频繁可以不用关注;

2、failover的节点对应TM的内存设置太小,GC严重导致心跳超时,建议调大对应节点的内存值

解决方案:

加大flink程序的运行内存


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

flink上线异常日记
http://yuting0907.github.io/posts/f77448ef.html
作者
Echo Yu
发布于
2022年12月5日
许可协议