Flink MULTI JDBC CONNECTOR
提供分库分表的jdbc链接方法,目前只提供批模式。
参数
参数名称 | 是否必填 | 说明 |
---|---|---|
connector | 是 | 填写multi-jdbc |
url | 是 | jdbc的链接串,用分号分割。jdbc:mysql://${connection1};jdbc:mysql://${connection2} |
table-name | 是 | 可用正则匹配 |
schema-name | 是 | 可用正则匹配 |
username | 否 | 用户名 |
password | 否 | 密码 |
driver | 否 | |
scan.partition.column | 是 | 分片的字段,最好是自增id主键 |
scan.batch.size | 否 | scan.batch.size 和 scan.partition.num必填写一个。每个batch的大小,每个表会多一个select count(1)的查询去获取表的数据量进行计算 |
scan.partition.num | 否 | scan.batch.size 和 scan.partition.num必填写一个。每张表分多少个片。 |
设计
基于flip-27设计的flink接口,实现的分库分表jdbc的链接器。 enumerator(在jobmanager内)负责
- 把符合条件的库名找出来。
- 把符合表名称的表明拿出来。
- 取这张表的最大值以及最小值。
- 获取一个批次的步长。
- 若填写了batch size。则通过select count 获取表行数计算,在使用 批次大小/总行数 * (max id - min id)计算出步长。
- 拖填写了partition num。 则使用 (max id - min id) / 分片数量 计算出步长。
- 根据步长来生成对应的sql(select * from xxxx where column between xxxx and xxxx)
代码
1 |
|
参考:https://github.com/peng128/flink-connector-multi-jdbc
使用方式
1.pom文件引入依赖
1 |
|
2.将https://github.com/peng128/flink-connector-multi-jdbc打成jar包
3.mvn install手动引入FlinkMultiJdbcConnector的jar包
4.编写EscMysql2DorisBatch类和GoodsEbayCompatibilityModel类
GoodsEbayCompatibilityModel:同步的mysql表字段的映射bean类
EscMysql2DorisBatch:批量同步类
5.启动代码
flink run -m yarn-cluster -ys 5 -ynm esc-multi-lang-test -yD taskmanager.memory.managed.fraction=0.2 -yjm 2048 -ytm 4096 -c com.itiaoling.app.ods.EscMultiLangMysql2DorisBatch -z mysql2doris_multi-lang-test -d /home/appman/realtime_warehouse/jar/esc_batch/PrdGoodsMultiLang.jar
不足
1.需要自增主健同步才快
2.暂时不支持datatime格式作为分割的列,指定的分割字段
会根据分割字段转换成select where xxx between 0 and 1000 的查询sql,如果xxx为string类型,会存在隐式转换的问题
3.原理是使用JDBC的方式select查询数据库,可以对比和datax的同步方式,本质是一样的
datax的切分策略,参考网页:https://cloud.tencent.com/developer/article/1694188
1)计算并发量(即 needChannelNumber 大小)
DataX有流控模式,其中,可以设置 bps 限速,tps 限速:
bps 限速:needChannelNumber = 总 byteLimit / 单个 Channel byteLimit
tps 限速:needChannelNumber = 总 recordLimit / 单个 Channel recordLimit
如果以上都没有设置,则会根据用户在
job.setting.speed.channel
配置的并发数量设置 needChannelNumber。
2)根据 needChannelNumber 将 Job 切分成多个 Task
这个步骤的具体切分逻辑交由相关插件去完成,例如 Rdb 对数据的拆分主要分成两类:
如果用户配置了具体的 Table 数量,那么就按照 Table 为最小单元进行拆分(即一个 Table 对应一个 Task),并生成对应的 querySql;
如果用户还配置了 splitPk,则会根据 splitPk 进行切分,具体逻辑是根据 splitPk 区间对 Table 进行拆分,并生成对应的 querySql。
觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭
wechat pay
alipay