记一次Flinkcdc调参

场景描述:

利用Flinkcdc从mysql中抓取数据到kafka,再写到doris数据库里

mysql表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CREATE TABLE `goods_aspects_relation_detail` (
`id` varchar(32) NOT NULL,
`GOODS_UID` varchar(32) NOT NULL COMMENT '商品ID',
`COM_UID` varchar(32) DEFAULT NULL COMMENT '公司',
`site` varchar(32) NOT NULL COMMENT '站点',
`out_category_id` varchar(32) DEFAULT NULL COMMENT '品类id',
`localized_aspect_name` varchar(82) DEFAULT NULL COMMENT '名称',
`localized_aspect_values` varchar(512) DEFAULT NULL COMMENT '值',
`aspect_data_type` varchar(32) DEFAULT NULL COMMENT '数据类型',
`aspect_enabled_for_variations` tinyint(1) DEFAULT NULL,
`aspect_format` varchar(32) DEFAULT NULL COMMENT '格式',
`aspect_max_length` bigint(20) DEFAULT NULL COMMENT '最大长度',
`aspect_mode` varchar(32) DEFAULT NULL COMMENT '功能',
`aspect_required` tinyint(1) DEFAULT NULL COMMENT '是否必须',
`aspect_usage` varchar(32) DEFAULT NULL COMMENT '使用',
`expected_required_by_date` varchar(32) DEFAULT NULL,
`item_to_aspect_cardinality` varchar(32) DEFAULT NULL,
`aspect_applicable_to` varchar(255) DEFAULT NULL,
`aspect_values` longtext,
`is_active` tinyint(1) NOT NULL DEFAULT '1' COMMENT ' 数据是否已被逻辑删除,0 已删除,1 未删除',
`creator` varchar(32) DEFAULT NULL COMMENT '创建人',
`modifier` varchar(32) DEFAULT NULL COMMENT '修改人',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`modify_time` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `GOODS_UID_site_out_category_id_com_id` (`GOODS_UID`,`site`,`out_category_id`,`COM_UID`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品Aspect明细表'

其中aspect_values longtext,字段超大,估计塞了一篇网文还持续连载的那种🤮

1
2
3
4
5
6
--通过下列语句查表空间和记录数,以及每行数据的大小
select TABLE_ROWS, concat(round(sum(DATA_LENGTH/1024/1024/1024), 2),'GB') as totalSize
, concat(round(sum(DATA_LENGTH/1024), 2)/TABLE_ROWS,'KB') as recordSize
from information_schema.TABLES
where table_schema='isc_goods'
and table_name='goods_aspects_relation_detail';

可以看到整张表占6.04GB, 其中aspect_values大字段就占5.8G,最大的字段长度是800+K

Debug过程

flinkcdc任务提交在yarn集群上,没有看到任何报错信息(可能本身写的代码输出信息比较少),以至于在taskmanager上没看到任何报错信息,但是kafka的topic就是不生成。

尝试将source来的data打印print出来,这时候发现报错信息了

1.首先找到yarn上刚提交的applicationid

2.找到对应的applicationid后点击进去,可以追踪到ApplicationMaster

Tracking URL: [ApplicationMaster]

3.点击对应的ApplicationMaster进来后的界面

4.查看日志输出,发现是GC overhead limit exceeded,也就是超出了taskmanager的指定内存了

5.尝试将ytm参数,也就是在提交运行flink任务的时候的taskmanager的内存提高到8G -ytm 2048

发现可以正常生成kafka topic,至此mysql -> kafka这一层可以正常work

6.在kafka sink到doris这一环节中

由于配置的dorisBatchSize: 5000,报JVM heap内存溢出,暂时将其调整为dorisBatchSize: 2000,报错消失。


后续可以尝试调整参数:

1
2
3
4
5
6
#CDC Connectors for Apache Flink

scan.incremental.snapshot.enabled: true
scan.incremental.snapshot.chunk.size: 100
scan.snapshot.fetch.size: 100
debezium.min.row.count.to.stream.result: 100

个人理解下来是减小每次拉取的数量,这是在内存不足的情况下可以尝试减小fetch size,chunk size的大小数量。但是我这内存充足就没进行相应的调优

参考官网:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#

还可以调整并行度和slot数量观察数据抽取速度


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

记一次Flinkcdc调参
http://yuting0907.github.io/posts/5e397986.html
作者
Echo Yu
发布于
2022年5月26日
许可协议