FlinkCDC-2.0-原理与实践

Flinkcdc原理

一、CDC 概述

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:

  • 数据同步:用于备份,容灾;
  • 数据分发:一个数据源分发给多个下游系统;
  • 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。

CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:

  • 基于查询的 CDC:
    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟。
  • 基于日志的 CDC:
    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

对比常见的开源 CDC 方案,我们可以发现:

  • 对比增量同步能力,
    • 基于日志的方式,可以很好的做到增量同步;
    • 而基于查询的方式是很难做到增量同步的。
  • 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
  • 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
  • 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
  • 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
    • 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
    • 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
  • 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。

Dynamic Table & ChangeLog Stream

  • Dynamic Table 就是 Flink SQL 定义的动态表,动态表和流的概念是对等的。参照上图,流可以转换成动态表,动态表也可以转换成流。
  • 在 Flink SQL中,数据在从一个算子流向另外一个算子时都是以 Changelog Stream 的形式,任意时刻的 Changelog Stream 可以翻译为一个表,也可以翻译为一个流。

联想下 MySQL 中的表和 binlog 日志,就会发现:MySQL 数据库的一张表所有的变更都记录在 binlog 日志中,如果一直对表进行更新,binlog 日志流也一直会追加,数据库中的表就相当于 binlog 日志流在某个时刻点物化的结果;

日志流就是将表的变更数据持续捕获的结果。这说明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一张不断变化的 MySQL 数据库表。

Flink CDC 1.x 可以不加锁,能够满足大部分场景,但牺牲了一定的数据准确性。Flink CDC 1.x 默认加全局锁,虽然能保证数据一致性,但存在上述 hang 住数据的风险。

通过上面的分析,可以知道 2.0 的设计方案,核心要解决上述的三个问题,即支持无锁、水平扩展、checkpoint。

单个chunk无锁一致性读

Flinkcdc2.0借鉴了DBlog 这篇论文里描述的无锁算法,利用Chunk 的切分算法可以实现数据同步任务多并发执行,和很多数据库的分库分表原理类似,Chunk切分算法通过表的主键对表中的数据进行分片。假设每个 Chunk 的步长为 10,按照这个规则进行切分,只需要把这些 Chunk 的区间做成左开右闭或者左闭右开的区间,保证衔接后的区间能够等于表的主键区间即可。

对于每个 Chunk 的无锁读算法,该算法的核心思想是在划分了 Chunk 后,对于每个 Chunk 的全量读取和增量读取,在不用锁的条件下完成一致性的合并。Chunk 的切分如下图所示:

因为每个 chunk 只负责自己主键范围内的数据,不难推导,只要能够保证每个 Chunk 读取的一致性,就能保证整张表读取的一致性,这便是无锁算法的基本原理。

Netflix 的 DBLog 论文中 Chunk 读取算法是通过在 DB 维护一张信号表,再通过信号表在 binlog 文件中打点,记录每个 chunk 读取前的 Low Position (低位点) 和读取结束之后 High Position (高位点) ,首先在低位点去查询该 Chunk 的全量数据。在读取出这一部分 Chunk 的数据之后,再将这 2 个位点之间的 binlog 增量数据合并到 chunk 所属的全量数据,从而得到高位点时刻chunk 对应的全量数据。

Flink CDC 结合自身的情况,在 Chunk 读取算法上做了去信号表的改进,不需要额外维护信号表,通过直接读取 binlog 位点替代在 binlog 中做标记的功能,整体的 chunk 读算法描述如下图所示:

比如正在读取 Chunk-1,Chunk 的区间是 [K1, K10],首先直接将该区间内的数据 select 出来并把它存在 buffer 中,在**select 之前记录 binlog 的一个位点 (低位点)select 完成后记录 binlog 的一个位点 (高位点)**。然后开始增量部分,消费从低位点到高位点的 binlog。

  • 图中的 - ( k2,100 ) + ( k2,108 ) 记录表示这条数据的值从 100 更新到 108;
  • 第二条记录是删除 k3;
  • 第三条记录是更新 k2 为 119;
  • 第四条记录是 k5 的数据由原来的 77 变更为 100。

观察图片中右下角最终的输出,会发现在消费该 chunk 的 binlog 时,出现的 key 是k2、k3、k5,我们前往 buffer 将这些 key 做标记。

  • 对于 k1、k4、k6、k7 来说,在高位点读取完毕之后,这些记录没有变化过,所以这些数据是可以直接输出的;
  • 对于改变过的数据,则需要将增量的数据合并到全量的数据中,只保留合并后的最终数据。例如,k2 最终的结果是 119 ,那么只需要输出 +(k2,119),而不需要中间发生过改变的数据。

通过这种方式,Chunk 最终的输出就是在高位点是 chunk 中最新的数据。

多个Chunk一致性读

上图描述的是单个 Chunk 的一致性读,但是如果有多个表分了很多不同的 Chunk,且这些 Chunk 分发到了不同的 task 中,那么如何分发 Chunk 并保证全局一致性读呢?

这个就是基于 FLIP-27 来优雅地实现的,通过下图可以看到有 SourceEnumerator 的组件,这个组件主要用于 Chunk 的划分,划分好的 Chunk 会提供给下游的 SourceReader 去读取,通过把 chunk 分发给不同的 SourceReader 便实现了并发读取 Snapshot Chunk 的过程,同时基于 FLIP-27 我们能较为方便地做到 chunk 粒度的 checkpoint。

当 Snapshot Chunk 读取完成之后,需要有一个汇报的流程,如下图中橘色的汇报信息,将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

汇报的主要目的是为了后续分发 binlog chunk (如下图)。因为 Flink CDC 支持全量 + 增量同步,所以当所有 Snapshot Chunk 读取完成之后,还需要消费增量的 binlog,这是通过下发一个 binlog chunk 给任意一个 Source Reader 进行单并发读取实现的。

Flinkcdc实践

Flink 版本是 1.14,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段: MySQL CDC 2.0 用时 13 分钟;1亿数据量的表,Source并发为1,全量读取阶段MySQL CDC 2.0 用时2小时

flinkcdc多库多表同步代码:(Mysql->Kafka->Doris)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
## Mysql2Kafka
package com.itiaoling.app.ods

import com.itiaoling.bean.config.{KafkaSinkConf, MySqlSourceConf, MySqlToKafkaConf}
import com.itiaoling.source.MySqlSourceFunction
import com.itiaoling.util.ExecutionEnvUtil
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import java.util
import java.util.Map

import com.itiaoling.sink.KafkaSinkFunction

object MySqlToKafka {

def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
val active = parameterTool.get("spring.profiles.active")
val sourceSinkType = Integer.valueOf(parameterTool.get("sourceSinkType"))
val businessType = Integer.valueOf(parameterTool.get("businessType"))
val tableName = parameterTool.get("tableName")
//mysql2kafka配置信息
val mySqlToKafkaConf: MySqlToKafkaConf = ExecutionEnvUtil.getSourceSink(active, sourceSinkType, businessType, tableName).asInstanceOf[MySqlToKafkaConf]

val flinkEnv: util.Map[String, String] = mySqlToKafkaConf.flinkEnv
val mysqlConf: MySqlSourceConf = mySqlToKafkaConf.mysqlSourceConf
val sinkConfig: KafkaSinkConf = mySqlToKafkaConf.kafkaSinkConf


val conf = new Configuration
val it: util.Iterator[Map.Entry[String, String]] = flinkEnv.entrySet().iterator()
while (it.hasNext) {
val kv: Map.Entry[String, String] = it.next()
conf.setString(kv.getKey, kv.getValue)
}

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(conf)

val mySqlSource: MySqlSource[String] = MySqlSourceFunction.createSourceFunction(mysqlConf)
// val prefix = sinkConfig.getKafkaTopicPrefix
// val suffix = sinkConfig.getKafkaTopicSuffix
// val topic: String = prefix + "_" + tableName + "_" + suffix
// val sink: KafkaSink[String] = KafkaSink.builder[String].setBootstrapServers(sinkConfig.getBootstrapServers)
// .setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build)
// .build
//使用此方法,可以将配置文件中同步所有表分别同步到不同的kafka的topic中
env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks[String],"MySQL Source")
.addSink(KafkaSinkFunction.createSinkFunction2(sinkConfig)).name(tableName)

// env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks[String], "MySQL Source").sinkTo(sink).setParallelism(2)

env.execute(mySqlToKafkaConf.getAppName)

}

}



package com.itiaoling.source

import com.itiaoling.bean.config.MySqlSourceConf
import java.util
import java.util.{Map, Properties}

import com.itiaoling.app.function.JsonDebeziumDeserializeSchema
import com.ververica.cdc.connectors.mysql.source.MySqlSource

object MySqlSourceFunction {

def createSourceFunction(mysqlConf: MySqlSourceConf) = {
validateParameter(mysqlConf)
val debeziumProperties = new Properties()

val it: util.Iterator[Map.Entry[String, String]] = mysqlConf.debeziumProperties.entrySet().iterator()
while (it.hasNext) {
val kv: Map.Entry[String, String] = it.next()
debeziumProperties.setProperty(kv.getKey, kv.getValue)
}
MySqlSource.builder()
.hostname(mysqlConf.hostname)
.port(mysqlConf.port)
.scanNewlyAddedTableEnabled(true)
.databaseList(mysqlConf.mysqlDb)
.username(mysqlConf.username)
.password(mysqlConf.password)
.tableList(mysqlConf.tableName)
.serverTimeZone(debeziumProperties.getOrDefault("server-time-zone", "UTC").toString)
.deserializer(new JsonDebeziumDeserializeSchema(debeziumProperties.getOrDefault("server-time-zone", "UTC").toString, mysqlConf.addColumns))
.debeziumProperties(debeziumProperties)
.build()
}

def validateParameter(mysqlConf: MySqlSourceConf) = {
// Validater.NullValidate("hostname",mysqlConf.hostname)
}

}


package com.itiaoling.app.function

import java.time.ZoneId
import java.util.Map
import java.{sql, util}

import com.alibaba.fastjson.JSONObject
import com.itiaoling.util.DateUtils
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import com.ververica.cdc.debezium.utils.TemporalConversions
import io.debezium.data.Envelope
import io.debezium.data.Envelope.Operation
import io.debezium.time._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.types.RowKind
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{Field, SchemaBuilder, Struct}
import org.apache.kafka.connect.source.SourceRecord


class JsonDebeziumDeserializeSchema(serverTimeZone: String, addColumns: java.util.Map[String, String]) extends DebeziumDeserializationSchema[String] {

var ts_ms: AnyRef = _

override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
val operation: Envelope.Operation = Envelope.operationFor(sourceRecord)
val value: Struct = sourceRecord.value().asInstanceOf[Struct]
val sourceStruct: Struct = value.getStruct("source")
val db: String = sourceStruct.get("db").asInstanceOf[String]
val table: String = sourceStruct.get("table").asInstanceOf[String]
val source: String = db.replaceAll("^*\\d$", "") + "." +
table.replaceAll("^*_\\d$", "")

ts_ms = value.get("ts_ms")

if (operation != Operation.CREATE && operation != Operation.READ) {
if (operation == Operation.DELETE) {
val data: JSONObject = extractData(value, "before")
data.put("is_deleted_flg",1)

val record: String = new JSONObject()
.fluentPut("op", RowKind.DELETE.shortString())
.fluentPut("data", addColumns(data))
.fluentPut("source", source)
.toJSONString
collector.collect(record)
} else {
val beforeData: JSONObject = extractData(value, "before")
val afterData: JSONObject = extractData(value, "after")
afterData.put("is_deleted_flg",0)

val map: util.Map[String, AnyRef] = beforeData.getInnerMap
val map1 = afterData.getInnerMap

map.putAll(map1)

val afterRecord: String = new JSONObject()
.fluentPut("source", source)
.fluentPut("op", RowKind.UPDATE_AFTER.shortString())
.fluentPut("data", addColumns(new JSONObject(map)))
.toJSONString
collector.collect(afterRecord)
}
} else {
val data: JSONObject = extractData(value, "after")
data.put("is_deleted_flg",0)
val record: String = new JSONObject()
.fluentPut("source", source)
.fluentPut("data", addColumns(data))
.fluentPut("op", RowKind.INSERT.shortString())
.toJSONString
collector.collect(record)
}

}

override def getProducedType: TypeInformation[String] = BasicTypeInfo.STRING_TYPE_INFO


def extractData(struct: Struct, structType: String): JSONObject = {
import scala.collection.JavaConversions._
val valueStruct: Struct = struct.getStruct(structType)
val data = new JSONObject()

val fields: util.List[Field] = valueStruct.schema().fields()
val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")
fields.foreach(field => {
val v: AnyRef = valueStruct.get(field)
val typ: String = field.schema().name()
val value: AnyRef = typ match {
//转成当地时区,本项目中的当地时间就是中国时间,所以直接写死
case Timestamp.SCHEMA_NAME => if (v == null) null else DateUtils.format(v.toString.toLong,ZoneId.of("UTC"))

case ZonedTimestamp.SCHEMA_NAME => if (v == null) null else sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(v, ZoneId.of("Asia/Shanghai"))).toString

case Date.SCHEMA_NAME =>if (v == null) null else DateUtils.format2(v.toString.toLong * 24 * 60 * 60 *1000L)
case _ => v
}
data.put(field.name(), value)
})

data
}


def addColumns(data: JSONObject): JSONObject = {
val it: util.Iterator[Map.Entry[String, String]] = addColumns.entrySet().iterator()
while (it.hasNext) {
val kv: Map.Entry[String, String] = it.next()
data.put(kv.getKey, kv.getValue)
}
data.put("ts_ms",ts_ms)
data
}


}

kafka2doris

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.itiaoling.app.ods;

import com.alibaba.fastjson.JSONObject;
import com.itiaoling.bean.Kafka2DorisConf;
import com.itiaoling.bean.config.DorisSinkConf;
import com.itiaoling.bean.config.KafkaSourceConf;
import com.itiaoling.bean.config.Synchronization;
import com.itiaoling.common.OffsetStrategyEnum;
import com.itiaoling.util.ExecutionEnvUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Description: Kafka Sink Doris程序
*
* @author: liu chao
* @date: 2022-03-04 17:03
*/
@Slf4j
public class Kafka2Doris {

public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String active = parameterTool.get("spring.profiles.active");
Integer sourceSinkType = Integer.valueOf(parameterTool.get("sourceSinkType"));
Integer businessType = Integer.valueOf(parameterTool.get("businessType"));
String tableName = parameterTool.get("tableName");
//kafka2doris配置信息
Kafka2DorisConf kafka2DorisConf = (Kafka2DorisConf) ExecutionEnvUtil.getYamlSourceSink(active, sourceSinkType, businessType, tableName);
//Flink环境配置信息
Map<String, String> flinkEnv = kafka2DorisConf.getFlinkEnv();
//kafka配置信息
KafkaSourceConf kafkaSourceConf = kafka2DorisConf.getKafkaSourceConf();
//Doris Sink信息
DorisSinkConf dorisSinkConf = kafka2DorisConf.getDorisSinkConf();
//同步配置信息
List<Synchronization> synchronizations = kafka2DorisConf.getSynchronizations();

Properties kafkaProp = new Properties();
kafkaProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConf.getBootstrapServers());
kafkaProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConf.getGroupId());
kafkaProp.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConf.getOffsetConfig());

String dorisUrlWithPort = dorisSinkConf.getDorisUrlWithPort();
String dorisUser = dorisSinkConf.getDorisUser();
String dorisPassword = dorisSinkConf.getDorisPassword();


Configuration conf = new Configuration();
Iterator<Map.Entry<String, String>> it = flinkEnv.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, String> kv = it.next();
conf.setString(kv.getKey(), kv.getValue());
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
for (Synchronization synchronization : synchronizations) {
String sequenceCol = synchronization.getSequenceCol();

String kafkaTopic = synchronization.getKafkaTopic();
String dorisDb = synchronization.getDorisDb();
String dorisTable = synchronization.getDorisTable();
log.info("kafkaTopic: " + kafkaTopic + "\n" + "dorisDb:" + dorisDb + "\n" + "dorisTable:" + dorisTable+"\n");

Properties dorisProp = new Properties();
dorisProp.setProperty("format", "json");
dorisProp.setProperty("strip_outer_array", "true");
dorisProp.setProperty("function_column.sequence_col", sequenceCol);

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(kafkaSourceConf.getBootstrapServers())
.setTopics(kafkaTopic)
.setGroupId(kafkaSourceConf.getGroupId())
.setStartingOffsets(OffsetStrategyEnum.getStrategyByConfig(kafkaSourceConf.getOffsetConfig()))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> json = env.fromSource(source, WatermarkStrategy.noWatermarks(), kafkaTopic);

//将非正常的json数据剔除
SingleOutputStreamOperator<String> filter = json.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) {
Boolean result = true;
try {
JSONObject jsonObject = (JSONObject) JSONObject.parse(value);
Object data = jsonObject.get("data");
if (data == null || "null".equals(data)){
result = false;
}
} catch (Exception e) {
result = false;
log.error("json 格式化异常: " + value, e);
}
return result;
}
});

SingleOutputStreamOperator<String> rowData = filter.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
JSONObject jsonObject = (JSONObject) JSONObject.parse(value);
JSONObject data = (JSONObject) jsonObject.get("data");

if (data.get("create_time") == null || "null".equals(data.get("create_time"))) {
data.put("create_time", "2020-01-01 00:00:00");
}

out.collect(data.toJSONString());

}
});

rowData.addSink(DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(synchronization.getDorisBatchSize())
.setBatchIntervalMs(synchronization.getDorisBatchIntervalMs())
.setMaxRetries(synchronization.getMaxRetries())
.setStreamLoadProp(dorisProp).build(),
DorisOptions.builder()
.setFenodes(dorisUrlWithPort)
.setTableIdentifier(dorisDb + "." + dorisTable)
.setUsername(dorisUser)
.setPassword(dorisPassword).build()
)).name(dorisDb + "." + dorisTable);

}

env.execute(kafka2DorisConf.getAppName());
}

}

1.当yarn per job模式下一个flinkcdc作业同步多库多表数据时候, 如果该作业同步的表中有一个数据量比较大的表,建议将该表单独拎出来作为一个作业同步,内存调大。不然会影响同批次别的表的同步。


觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

FlinkCDC-2.0-原理与实践
http://yuting0907.github.io/2022/12/06/FlinkCDC-2-0-原理与实践/
作者
Echo Yu
发布于
2022年12月6日
许可协议