Flinkcdc同步mysql数据到Doris-streamload出错跳过数据问题

问题描述

Flinkcdc实时同步mysql数据到doris,原始修改表结构需要停任务,自己手工同步修改更新表结构,再重启任务,比较麻烦,基于此自定义Dorissink,刚开始没implements CheckpointedFunction类,会出现,这个checkpoint失败的时候,直接跳过这个checkpoint的数据,这样就会导致漏数据。

引起checkpoint失败的原因举例:Doris表字段设置了not null,会导致streamload失败,正常来说这一批streamload的数据需要等在那儿,不被丢失。但实际测试情况是如果没有implements CheckpointedFunction类,这批数据就会丢了

相关知识

DorisSink 是 Flink 的一个 Sink Function,其在实现从最近的检查点重启的过程中,主要依赖于 Flink 的 Checkpointing 机制和 Flink 的状态恢复功能。

具体来说,DorisSink 首先需要在实现中启用 Flink 的 Checkpointing 机制,以便在任务失败时,能够将所有状态数据写入到检查点中。这可以通过在 Flink 的执行环境中设置相关参数来实现,例如:

1
2
3
4
5
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

这里设置了 Checkpointing 的间隔为 60 秒,模式为精确一次,最短间隔为 5 秒,超时时间为 60 秒,同时最多同时进行一个检查点。

接下来,在任务失败后,DorisSink 会在重启时使用 Flink 的状态恢复功能,从最近的检查点中恢复所有状态数据。这可以通过在 DorisSink 的实现中实现 restoreState() 方法来实现,例如:

1
2
3
4
5
6
@Override
public void restoreState(FinalizedState finalizedState) throws Exception {
// 从最近的检查点中恢复状态数据
state.restore(finalizedState.getMetadata());
}

在 Apache Flink 中,实现 CheckpointedFunction 接口可以让函数自动支持检查点和恢复。DorisSink 实现了 CheckpointedFunction 接口中的 snapshotState() 和 initializeState() 方法来支持检查点。

snapshotState() 方法用于将函数的状态保存到状态后端,即将所有正在处理的数据的状态(如正在缓存的数据、已完成但未提交的数据)写入到状态后端。在发生故障时,Flink 将使用这个检查点来恢复状态。在 DorisSink 中,snapshotState() 方法的实现会将所有 DorisDynamicOutputFormatForJson 实例中已缓存但未提交的数据写入到 Doris 中。

initializeState() 方法用于将状态从状态后端中恢复。在 DorisSink 中,initializeState() 方法的实现是空的,因为 DorisDynamicOutputFormatForJson 已经自带了状态的恢复逻辑,不需要再手动实现。

综上,DorisSink 中的 snapshotState() 和 initializeState() 方法是用于支持检查点和状态的恢复。snapshotState() 方法将数据缓存到状态后端,而 initializeState() 方法则在任务启动时从状态后端恢复数据。

问题解决

代码如下:

值得注意的是需要继承implements CheckpointedFunction类,并且重写一下两个方法

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
for (DorisDynamicOutputFormatForJson outputFormat : outputFormatList.values()) {
outputFormat.flush();
}
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {

}

完整的Doris sink.java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package com.itiaoling.sink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.itiaoling.bean.FlinkCdcTableInfoBean;
import com.itiaoling.bean.FlinkConfTableInfoBean;
import com.itiaoling.bean.FlinkHostInfoBean;
import com.itiaoling.function.DorisDynamicOutputFormatForJson;
import com.itiaoling.utils.JdbcUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.types.logical.LogicalType;

import java.sql.Connection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static com.itiaoling.sink.DorisSink.Builder.getProperties;

@Slf4j
public class DorisSink extends RichSinkFunction<Tuple2<String, String>> implements CheckpointedFunction {

private final Map<String, DorisDynamicOutputFormatForJson<String>> outputFormatList;

public DorisSink(
Map<String, DorisDynamicOutputFormatForJson<String>> outputFormatList) {
this.outputFormatList = outputFormatList;
}


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
for (DorisDynamicOutputFormatForJson<String> outputFormat : outputFormatList.values()) {
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
}

@Override
public void invoke(Tuple2<String, String> value, Context context) throws Exception {
JSONObject rowJson = JSON.parseObject(value.f1);
FlinkHostInfoBean flinkHostInfo = JSON.parseObject(rowJson.getString("flinkHostInfo"), FlinkHostInfoBean.class);
FlinkConfTableInfoBean flinkConfTableInfo = JSON.parseObject(rowJson.getString("flinkConfTableInfo"), FlinkConfTableInfoBean.class);
int id = flinkHostInfo.getId();
String sinkFe = flinkHostInfo.getSinkHostPort();
String sinkUsername = flinkHostInfo.getSinkUserName();
String sinkPassword = flinkHostInfo.getSinkPassword();
if (rowJson.containsKey("isNewAdd")) {
String isNewAdd = rowJson.getString("isNewAdd");
FlinkCdcTableInfoBean flinkCdcTableInfo = JSON.parseObject(rowJson.getString("data"), FlinkCdcTableInfoBean.class);
String dorisTable = flinkHostInfo.getSinkDb() + "." + flinkCdcTableInfo.getSinkTableName();
String dbName = flinkCdcTableInfo.getSourceDb();
String mysqlTable = flinkCdcTableInfo.getSourceTableName();
if ("1".equals(isNewAdd)) {
log.info("have no sink for new table,to create a sink");
LogicalType[] types = {};
String[] field = {};
Properties sinkPro = getProperties();
if (StringUtils.isNotEmpty(flinkCdcTableInfo.getStrictMode())&&flinkCdcTableInfo.getStrictMode().equals("false")){
sinkPro.setProperty("strict_mode", "false");
sinkPro.setProperty("max_filter_ratio", "1");
}
DorisOptions.Builder option = Builder.getBuilder(sinkFe, sinkUsername, sinkPassword);
DorisExecutionOptions.Builder execution = Builder.getBuilder();
Boolean exists = JdbcUtils.checkDorisExists(dorisTable, flinkHostInfo);
if (exists) {
DorisDynamicOutputFormatForJson<String> newFormat =
new DorisDynamicOutputFormatForJson<>(
option.setTableIdentifier(dorisTable).build(),
DorisReadOptions.defaults(),
execution.setStreamLoadProp(sinkPro).build(),
types,
field);
if (outputFormatList.containsKey(dbName + "." + mysqlTable)) {
outputFormatList.get(dbName + "." + mysqlTable).close();
}
outputFormatList.put(dbName + "." + mysqlTable, newFormat);
RuntimeContext ctx = getRuntimeContext();
newFormat.setRuntimeContext(ctx);
newFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
log.info("create new sink format successfully");
} else {
log.error("not found doris table:" + dorisTable);
}
} else {
//动态关闭StreamLoad
if (outputFormatList.containsKey(dbName + "." + mysqlTable)) {
outputFormatList.get(dbName + "." + mysqlTable).close();
}
outputFormatList.remove(dbName + "." + mysqlTable);
}
} else if (rowJson.containsKey("data") && !rowJson.containsKey("isNewAdd")) {
JSONObject data = (JSONObject) JSON.parseObject(value.f1).get("data");
if (outputFormatList.get(value.f0) != null) {
outputFormatList.get(value.f0).writeRecord(data.toJSONString());
} else {
String queryDb = value.f0.split("\\.")[0];
String queryTable = value.f0.split("\\.")[1];
Connection jdbcConn = JdbcUtils.createJdbcConn(flinkConfTableInfo.getHostname(), flinkConfTableInfo.getPort(), flinkConfTableInfo.getUsername()
, flinkConfTableInfo.getPassword(), flinkConfTableInfo.getMysqlDb());
String cdcTable = flinkConfTableInfo.getMysqlDb() + "." + flinkConfTableInfo.getTableInfoTableName();
FlinkCdcTableInfoBean flinkCDCTable = JdbcUtils.queryTableByTableName(jdbcConn, cdcTable, id, queryDb, queryTable);
if (flinkCDCTable == null) {
log.error("no found table in config cdc table:" + flinkConfTableInfo.getHostname() + " " + cdcTable);
} else {
log.info("this table found in cdc table , renew sink format!");
String dorisTableName = flinkHostInfo.getSinkDb() + "." + flinkCDCTable.getSinkTableName();
LogicalType[] types = {};
String[] field = {};
Properties sinkPro = getProperties();
if (StringUtils.isNotEmpty(flinkCDCTable.getStrictMode())&&flinkCDCTable.getStrictMode().equals("false")){
sinkPro.setProperty("strict_mode", "false");
sinkPro.setProperty("max_filter_ratio", "1");
}
DorisOptions.Builder option = Builder.getBuilder(sinkFe, sinkUsername, sinkPassword);
DorisExecutionOptions.Builder execution = Builder.getBuilder();
Boolean exists = JdbcUtils.checkDorisExists(dorisTableName, flinkHostInfo);
if (exists) {
DorisDynamicOutputFormatForJson<String> newFormat =
new DorisDynamicOutputFormatForJson<>(
option.setTableIdentifier(dorisTableName).build(),
DorisReadOptions.defaults(),
execution.setStreamLoadProp(sinkPro).build(),
types,
field);
outputFormatList.put(queryDb + "." + queryTable, newFormat);
RuntimeContext ctx = getRuntimeContext();

newFormat.setRuntimeContext(ctx);
newFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
log.info("create after renew sink format successfully");
outputFormatList.get(value.f0).writeRecord(data.toJSONString());
} else {
log.error("not found doris table:" + dorisTableName);
}
}
}
} else {
outputFormatList.get(value.f0).writeRecord(value.f1);
}

}

@Override
public void close() throws Exception {
super.close();
for (DorisDynamicOutputFormatForJson<String> outputFormat : outputFormatList.values()) {
outputFormat.close();
}
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
for (DorisDynamicOutputFormatForJson<String> outputFormat : outputFormatList.values()) {
outputFormat.flush();
}
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {

}

public static class Builder {

private Builder() {
throw new IllegalStateException("DorisSink Builder class");
}

public static DorisSink build( FlinkHostInfoBean flinkHostInfoBean, List<FlinkCdcTableInfoBean> listCDCTables) {
// 配置
String sinkFe = flinkHostInfoBean.getSinkHostPort();
String sinkUsername = flinkHostInfoBean.getSinkUserName();
String sinkPassword = flinkHostInfoBean.getSinkPassword();

// 统一的配置
Properties sinkPro = getProperties();

DorisOptions.Builder option = getBuilder(sinkFe, sinkUsername, sinkPassword);
DorisExecutionOptions.Builder execution = getBuilder();
LogicalType[] types = {};
String[] field = {};

HashMap<String, DorisDynamicOutputFormatForJson<String>> outputMaps =
new HashMap<>();

for (FlinkCdcTableInfoBean listCDCTable : listCDCTables) {
String dorisTable= listCDCTable.getSinkTableName();
if (StringUtils.isNotEmpty(listCDCTable.getStrictMode())&&listCDCTable.getStrictMode().equals("false")){
sinkPro.setProperty("strict_mode", "false");
sinkPro.setProperty("max_filter_ratio", "1");
}
Boolean exists = JdbcUtils.checkDorisExists(flinkHostInfoBean.getSinkDb() + "." + dorisTable, flinkHostInfoBean);
if (exists) {
outputMaps.put(
listCDCTable.getSourceDb() + "." + listCDCTable.getSourceTableName(),
new DorisDynamicOutputFormatForJson<>(
option.setTableIdentifier(flinkHostInfoBean.getSinkDb() + "." + dorisTable).build(),
DorisReadOptions.defaults(),
execution.setStreamLoadProp(sinkPro).build(),
types,
field));
} else {
log.error("not found doris table:" + flinkHostInfoBean.getSinkDb() + "." + dorisTable);
}
}
return new DorisSink(outputMaps);
}

public static DorisExecutionOptions.Builder getBuilder() {
return DorisExecutionOptions.builder().setMaxRetries(5).setBatchSize(50000).setBatchIntervalMs(20000L);
}

public static DorisOptions.Builder getBuilder(
String sinkFe, String sinkUsername, String sinkPassword) {
return DorisOptions.builder()
.setFenodes(sinkFe)
.setUsername(sinkUsername)
.setPassword(sinkPassword);
}

public static Properties getProperties() {
Properties sinkPro = new Properties();
sinkPro.setProperty("format", "json");
sinkPro.setProperty("strip_outer_array", "true");
sinkPro.setProperty("function_column.sequence_col", "seq_col");

return sinkPro;
}
}
}

觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

Flinkcdc同步mysql数据到Doris-streamload出错跳过数据问题
http://yuting0907.github.io/posts/491294c5.html
作者
Echo Yu
发布于
2023年4月14日
许可协议