原本的processBroadcastElement函数逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { String tableInfo = JSON.parseObject(value.f1).getString("data"); FlinkCdcTableInfoBean flinkCdcTableInfoBean = JSON.parseObject(tableInfo, FlinkCdcTableInfoBean.class); String key = flinkCdcTableInfoBean.getSourceDb() + "_" + flinkCdcTableInfoBean.getSourceTableName() + "_" + flinkCdcTableInfoBean.getId();
BroadcastState<String, FlinkCdcTableInfoBean> broadcastState=ctx.getBroadcastState(mapStateDescriptor);
broadcastState.put(key, flinkCdcTableInfoBean); } }).name("MySQL data sync stream");
|
从checkpoint重启,processBroadcastElement函数被修改掉,等于是广播的状态broadcastState被修改掉了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { String tableInfo = JSON.parseObject(value.f1).getString("data"); FlinkCdcTableInfoBean flinkCdcTableInfoBean = JSON.parseObject(tableInfo, FlinkCdcTableInfoBean.class); if (StringUtils.isNotEmpty(flinkCdcTableInfoBean.getSecretColumns())) { List<FlinkCdcTableInfoBean.SecretColumnDTO> secretColumns = JSONValidator.from(flinkCdcTableInfoBean.getSecretColumns()).validate() ? JSON.parseArray(flinkCdcTableInfoBean.getSecretColumns(), FlinkCdcTableInfoBean.SecretColumnDTO.class) : null; flinkCdcTableInfoBean.setSecretColumnDTO(secretColumns); } if (StringUtils.isNotEmpty(flinkCdcTableInfoBean.getAddColumns())) { List<FlinkCdcTableInfoBean.AddColumnDTO> addColumnDTO = JSONValidator.from(flinkCdcTableInfoBean.getAddColumns()).validate() ? JSON.parseArray(flinkCdcTableInfoBean.getAddColumns(), FlinkCdcTableInfoBean.AddColumnDTO.class) : null; flinkCdcTableInfoBean.setAddColumnsDTO(addColumnDTO); } String key = flinkCdcTableInfoBean.getSourceDb() + "_" + flinkCdcTableInfoBean.getSourceTableName() + "_" + flinkCdcTableInfoBean.getId();
BroadcastState<String, FlinkCdcTableInfoBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor); broadcastState.put(key, flinkCdcTableInfoBean); } }).name("MySQL data sync stream");
|
修改办法:
在使用 BroadcastState 时,需要注意广播状态数据的一致性。如果广播状态中的数据在 checkpoint 之前发生了修改,那么在从 checkpoint 恢复状态时,可能会导致状态不一致。
为了保证广播状态数据的一致性,在使用 BroadcastState 时,可以考虑使用以下两种方法:
对广播状态的修改进行同步:在对广播状态进行修改时,需要确保只有一个线程进行修改,避免并发修改导致数据不一致。可以使用锁或者线程安全的数据结构来实现。
使用版本号控制状态的变更:在广播状态中加入版本号,每次更新状态时,都更新版本号。在从 checkpoint 恢复状态时,通过比较版本号来判断状态是否一致。如果版本号不一致,说明状态已经被修改,需要重新更新状态。可以使用 ValueState 来存储版本号。