Flink程序从checkpoint点启动-广播流修改问题

原本的processBroadcastElement函数逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {

String tableInfo = JSON.parseObject(value.f1).getString("data");
FlinkCdcTableInfoBean flinkCdcTableInfoBean = JSON.parseObject(tableInfo, FlinkCdcTableInfoBean.class);

String key = flinkCdcTableInfoBean.getSourceDb() + "_" + flinkCdcTableInfoBean.getSourceTableName() + "_" + flinkCdcTableInfoBean.getId();


BroadcastState<String, FlinkCdcTableInfoBean> broadcastState=ctx.getBroadcastState(mapStateDescriptor);

broadcastState.put(key, flinkCdcTableInfoBean);
}
}).name("MySQL data sync stream");

从checkpoint重启,processBroadcastElement函数被修改掉,等于是广播的状态broadcastState被修改掉了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {

String tableInfo = JSON.parseObject(value.f1).getString("data");

FlinkCdcTableInfoBean flinkCdcTableInfoBean = JSON.parseObject(tableInfo, FlinkCdcTableInfoBean.class);

if (StringUtils.isNotEmpty(flinkCdcTableInfoBean.getSecretColumns())) {
List<FlinkCdcTableInfoBean.SecretColumnDTO> secretColumns = JSONValidator.from(flinkCdcTableInfoBean.getSecretColumns()).validate() ?
JSON.parseArray(flinkCdcTableInfoBean.getSecretColumns(), FlinkCdcTableInfoBean.SecretColumnDTO.class) : null;
flinkCdcTableInfoBean.setSecretColumnDTO(secretColumns);
}
if (StringUtils.isNotEmpty(flinkCdcTableInfoBean.getAddColumns())) {
List<FlinkCdcTableInfoBean.AddColumnDTO> addColumnDTO = JSONValidator.from(flinkCdcTableInfoBean.getAddColumns()).validate() ?
JSON.parseArray(flinkCdcTableInfoBean.getAddColumns(), FlinkCdcTableInfoBean.AddColumnDTO.class) : null;
flinkCdcTableInfoBean.setAddColumnsDTO(addColumnDTO);
}

String key = flinkCdcTableInfoBean.getSourceDb() + "_" + flinkCdcTableInfoBean.getSourceTableName() + "_" + flinkCdcTableInfoBean.getId();
 BroadcastState<String, FlinkCdcTableInfoBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
broadcastState.put(key, flinkCdcTableInfoBean);
}
}).name("MySQL data sync stream");

修改办法:

在使用 BroadcastState 时,需要注意广播状态数据的一致性。如果广播状态中的数据在 checkpoint 之前发生了修改,那么在从 checkpoint 恢复状态时,可能会导致状态不一致。

为了保证广播状态数据的一致性,在使用 BroadcastState 时,可以考虑使用以下两种方法:

对广播状态的修改进行同步:在对广播状态进行修改时,需要确保只有一个线程进行修改,避免并发修改导致数据不一致。可以使用锁或者线程安全的数据结构来实现。

使用版本号控制状态的变更:在广播状态中加入版本号,每次更新状态时,都更新版本号。在从 checkpoint 恢复状态时,通过比较版本号来判断状态是否一致。如果版本号不一致,说明状态已经被修改,需要重新更新状态。可以使用 ValueState 来存储版本号。


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

Flink程序从checkpoint点启动-广播流修改问题
http://yuting0907.github.io/posts/c490a47a.html
作者
Echo Yu
发布于
2023年4月14日
许可协议