flinkCDC抽取mysql数据问题记录

情景描述

利用flinkcdc抽取mysql数据到kafka,再从kafka消费数据到Doris。

上游mysql的delete操作,在下游Doris做成逻辑删除,即表增加一列is_delete_flg做为是否删除的标志。Doris表设置为unique模型,保证端对端的幂等性,保证数据不会重复。其中Doris的unique模型表需要设置sequence列。

flinkcdc官网介绍的op_ts操作时间正好可以做为sequence。

在同步了两百多张表后,发现有一张表Doris的is_delete_flg=1,也就是逻辑删除的状态,而mysql表里并没有删除。


定位问题

定位问题的思路就是打印原始数据看看,直接flinkcdc读取过来然后做print打印原始数据

1
2
3
4
5
6
7
8
9
10
------Struct-原始value数据如下:--------------------
Struct{before=Struct{goods_id=311054013088959091,send_location=US},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1654866586000,db=isc_goods,table=prd_goods_send_location,server_id=50103306,gtid=25e11912-aa95-11ec-9f6c-002248168efc:3002800,file=mysql_bin.000023,pos=647582276,row=0},op=d,ts_ms=1654866586278}

------DELETE-value数据如下:--------------------
Struct{before=Struct{goods_id=311054013088959091,send_location=US},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1654866586000,db=isc_goods,table=prd_goods_send_location,server_id=50103306,gtid=25e11912-aa95-11ec-9f6c-002248168efc:3002800,file=mysql_bin.000023,pos=647582276,row=0},op=d,ts_ms=1654866586278}
{"op":"-D","data":{"send_location":"US","goods_id":"311054013088959091","is_deleted_flg":1,"ts_ms":1654866586278},"source":"isc_goods.prd_goods_send_location"}

------Struct-原始value数据如下:--------------------
Struct{after=Struct{goods_id=311054013088959091,send_location=US},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1654866586000,db=isc_goods,table=prd_goods_send_location,server_id=50103306,gtid=25e11912-aa95-11ec-9f6c-002248168efc:3002800,file=mysql_bin.000023,pos=647582531,row=0},op=c,ts_ms=1654866586278}
{"op":"+I","data":{"send_location":"US","goods_id":"311054013088959091","is_deleted_flg":0,"ts_ms":1654866586278},"source":"isc_goods.prd_goods_send_location"}

发现delete的”ts_ms”:1654866586278和insert的 “ts_ms”:1654866586278是在同一毫秒发生,此时到Doris就会随机取一条数据,可能取到delete的那一条,所以标志is_delete_flg=1,而mysql由于还有insert同一条数据,故而发生了👆描述的问题。

猜想上游发生此数据现象的原因

可能delete和insert放在同一个事物中提交了,于是找上游要了处理逻辑

发现真是批量提交了

解决方案

1.可以将insert操作的op_ts➕1s使得和delete的操作时间不同

2.因为数据量比较小,我直接采用了select方式的datax方式抽取数据


觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

flinkCDC抽取mysql数据问题记录
http://yuting0907.github.io/2022/06/19/flinkCDC抽取mysql数据问题记录/
作者
Echo Yu
发布于
2022年6月19日
许可协议