Flink MULTI JDBC CONNECTOR

提供分库分表的jdbc链接方法,目前只提供批模式。

参数

参数名称 是否必填 说明
connector 填写multi-jdbc
url jdbc的链接串,用分号分割。jdbc:mysql://${connection1};jdbc:mysql://${connection2}
table-name 可用正则匹配
schema-name 可用正则匹配
username 用户名
password 密码
driver
scan.partition.column 分片的字段,最好是自增id主键
scan.batch.size scan.batch.size 和 scan.partition.num必填写一个。每个batch的大小,每个表会多一个select count(1)的查询去获取表的数据量进行计算
scan.partition.num scan.batch.size 和 scan.partition.num必填写一个。每张表分多少个片。

设计

基于flip-27设计的flink接口,实现的分库分表jdbc的链接器。 enumerator(在jobmanager内)负责

  1. 把符合条件的库名找出来。
  2. 把符合表名称的表明拿出来。
  3. 取这张表的最大值以及最小值。
  4. 获取一个批次的步长。
    1. 若填写了batch size。则通过select count 获取表行数计算,在使用 批次大小/总行数 * (max id - min id)计算出步长。
    2. 拖填写了partition num。 则使用 (max id - min id) / 分片数量 计算出步长。
  5. 根据步长来生成对应的sql(select * from xxxx where column between xxxx and xxxx)

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.itiaoling.app.ods;

import com.alibaba.fastjson.JSONObject;
import com.itiaoling.bean.batch.GoodsEbayCompatibilityModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;

public class EscMysql2DorisBatch {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.setString("execution.checkpointing.interval", "10000");
conf.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
conf.setString("state.checkpoints.num-retained", "2");
conf.setString("state.checkpoints.dir", "hdfs:/ns/flink/checkpoints/esc_buyer1_batch");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(10);
EnvironmentSettings Settings = EnvironmentSettings.newInstance().inStreamingMode().build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, Settings);
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tEnv.executeSql(
"CREATE TABLE `goods_ebay_compatibility_model` (\n" +
" `id` Integer,\n" +
" `site` STRING,\n" +
" `category_id` STRING,\n" +
" `esc_category_id` STRING,\n" +
" `model_id` STRING,\n" +
" `version` STRING,\n" +
" `update_type` Integer,\n" +
" `create_time` Timestamp,\n" +
" `update_time` Timestamp,\n" +
" `deleted` Integer\n" +
")" +
"WITH (\n" +
"\t'connector' = 'multi-jdbc',\n" +
"\t'table-name' = 'XXXXX',\n" +
"\t'schema-name' = 'isc_buyer',\n" +
"\t'username' = 'XXXXX',\n" +
"\t'password' = 'XXXXX',\n" +
"\t'scan.partition.column' = 'id',\n" +
// "\t'scan.batch.size' = '5000',\n" +
"\t'scan.partition.num' = '10000',\n" +
"\t'url' = 'jdbc:mysql://XXX:3306?useSSL=false'\n" +
")");

Table table = tEnv.sqlQuery("select t.id," +
"t.site," +
"t.category_id," +
"t.esc_category_id," +
"t.model_id," +
"t.version," +
"t.update_type," +
"DATE_FORMAT(t.create_time, 'yyyy-MM-dd HH:mm:ss') as create_time," +
"DATE_FORMAT(t.update_time, 'yyyy-MM-dd HH:mm:ss') as update_time," +
"t.deleted," +
"0 as is_deleted_flg, " +
"1673853468700 as ts_ms " +
"from XXXX t "
// "where t.version = '202211-0002'"
);

// table.printSchema();

DataStream<GoodsEbayCompatibilityModel> ordersDataStream = tEnv.toDataStream(table, GoodsEbayCompatibilityModel.class);
SingleOutputStreamOperator<Object> map = ordersDataStream.map(new MapFunction<GoodsEbayCompatibilityModel, Object>() {
@Override
public Object map(GoodsEbayCompatibilityModel goodsEbayCompatibilityModel) throws Exception {
return JSONObject.toJSON(goodsEbayCompatibilityModel).toString();
}
});

// map.print();

Properties sinkPproperties = new Properties();
sinkPproperties.setProperty("format", "json");
sinkPproperties.setProperty("strip_outer_array", "true");

map.addSink(
DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(100000)
.setBatchIntervalMs(20000l)
.setMaxRetries(3)
.setStreamLoadProp(sinkPproperties).build(),
DorisOptions.builder()
.setFenodes("XXX:8030,XXX:8030,XXX:8030")
.setTableIdentifier("ods.ods_bak")
.setUsername("XXX")
.setPassword("XXXX")
.build()
));

env.execute("mysql-sink-doris-batch");
}
}

参考:https://github.com/peng128/flink-connector-multi-jdbc

使用方式

1.pom文件引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>FlinkMultiJdbcConnector</artifactId>
<version>1.0</version>
</dependency>

2.将https://github.com/peng128/flink-connector-multi-jdbc打成jar包

3.mvn install手动引入FlinkMultiJdbcConnector的jar包

4.编写EscMysql2DorisBatch类和GoodsEbayCompatibilityModel类

​ GoodsEbayCompatibilityModel:同步的mysql表字段的映射bean类

​ EscMysql2DorisBatch:批量同步类

5.启动代码

flink run -m yarn-cluster -ys 5 -ynm esc-multi-lang-test -yD taskmanager.memory.managed.fraction=0.2 -yjm 2048 -ytm 4096 -c com.itiaoling.app.ods.EscMultiLangMysql2DorisBatch -z mysql2doris_multi-lang-test -d /home/appman/realtime_warehouse/jar/esc_batch/PrdGoodsMultiLang.jar

不足

1.需要自增主健同步才快

2.暂时不支持datatime格式作为分割的列,指定的分割字段

会根据分割字段转换成select where xxx between 0 and 1000 的查询sql,如果xxx为string类型,会存在隐式转换的问题

3.原理是使用JDBC的方式select查询数据库,可以对比和datax的同步方式,本质是一样的

datax的切分策略,参考网页:https://cloud.tencent.com/developer/article/1694188

1)计算并发量(即 needChannelNumber 大小)

DataX有流控模式,其中,可以设置 bps 限速,tps 限速:

  • bps 限速:needChannelNumber = 总 byteLimit / 单个 Channel byteLimit

  • tps 限速:needChannelNumber = 总 recordLimit / 单个 Channel recordLimit

    如果以上都没有设置,则会根据用户在 job.setting.speed.channel 配置的并发数量设置 needChannelNumber。

​ 2)根据 needChannelNumber 将 Job 切分成多个 Task

这个步骤的具体切分逻辑交由相关插件去完成,例如 Rdb 对数据的拆分主要分成两类:

  • 如果用户配置了具体的 Table 数量,那么就按照 Table 为最小单元进行拆分(即一个 Table 对应一个 Task),并生成对应的 querySql;

  • 如果用户还配置了 splitPk,则会根据 splitPk 进行切分,具体逻辑是根据 splitPk 区间对 Table 进行拆分,并生成对应的 querySql。


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

Flink MULTI JDBC CONNECTOR
http://yuting0907.github.io/posts/3f914a6b.html
作者
Echo Yu
发布于
2023年1月16日
许可协议