DataX源码解析-01整体架构

前言

近期在工作中需要用到DataX去作为公司内部离线数据同步引擎,好奇心驱使就花了一些时间研究了DataX的整体架构和设计思想,从中吸收了很多优秀的设计思路,作为一款纯Java实现的数据同步工具,相对于市面上已存在的基于大数据框架为背景的数据同步工具有着易部署、易扩展的优点,但不足的地方是alibaba只是开源了DataX单机模式代码,并未开源分布式部分代码,目前在Github中的只是阉割版是DataX。

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

https://raw.gitmirror.com/YUTING0907/PicGo/main/img20250901124943.png

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX核心架构概览

核心模块介绍:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,**会根据不同的源端切分策略,将Job切分成多个小的Task(子任务)**,以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX运行流程简单举例

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到doris里面。

DataX执行的思路为:

  • ① DataXJob根据表的数量切分成了100个Task。
  • ② DataX 默认给每个 TaskGroup分配 5 个 Channel,根据20个并发,DataX计算共需要分配20/5=4个TaskGroup(默认单个任务组的并发数量为5)。
  • ③ 根据 DataX 的公平分配策略,4个TaskGroup平分切分好100个Task,每一个TaskGroup共计运行100/4=25个Task(负责以5个并发)。

由于一个 Channel 对应一个线程执行,因此 DataX 的线程模型可以用如下图表示:

DataX源码解析

程序入口

从官网得知,要启动一个简单的数据任务,要经历以下二步:

  1. 配置任务json

  2. 使用DataX自带的Python脚本启动任务

    python datax.py job.json

在扒了datax.py的源码之后,发现使用python去进行启动的作用是快速构建了DataX的启动命令,尤其是补充了一些有用的jvm参数,实际上最后python拼接出来的命令如下:

1
2
java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}

我们根据这条命令就会发现入口的主类:com.alibaba.datax.core.Engine

主程序执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
Engine.entry(args);
} catch (Throwable e) {
exitCode = 1;
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));

if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
exitCode = tempErrorCode.toExitValue();
}
}

System.exit(exitCode);
}
System.exit(exitCode);
}

从上面代码,一切的一切都是从Engine.entry(args)这个方法开始,接下来分析一下这个方法究竟做了哪些工作:

1.解析命令行参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void entry(final String[] args) throws Throwable {
// 解析命令行参数
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");

BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);

//获取job json路径
String jobPath = cl.getOptionValue("job");

// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
// 获取执行方式 阉割版默认standalone
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
// 绑定i18n信息
MessageSource.init(configuration);
MessageSource.reloadResourceBundle(Configuration.class);

‘job’:获取job json路径;

“jobid”:当前job的id,如果用户未显式指定,那么将会被置为-1;

“mode”:当前job的运行模式,当前DataX开源版本只支持Standalone模式

2.解析配置,ConfigParser.parse(jobPath),往配置中加入系统默认项,用户只提供job部分,补上其余common core entry job plugin配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
{
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": [
"yyyyMMdd"
],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
},
"plugin": {
"reader": {
"streamreader": {
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"mechanism": "use datax framework to transport data from stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamreader",
"path": "/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/plugin/reader/streamreader"
}
},
"writer": {
"streamwriter": {
"class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
"description": {
"mechanism": "use datax framework to transport data to stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamwriter",
"path": "/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/plugin/writer/streamwriter"
}
}
}
}

3.打印当前jvm虚拟机信息

1
2
3
4
5
6
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}

4.打印当前配置

1
2
3
4
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

LOG.debug(configuration.toJSON());

5.配置校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ConfigurationValidate {
public static void doValidate(Configuration allConfig) {
Validate.isTrue(allConfig!=null, "");

coreValidate(allConfig);

pluginValidate(allConfig);

jobValidate(allConfig);
}

private static void coreValidate(Configuration allconfig) {
return;
}

private static void pluginValidate(Configuration allConfig) {
return;
}

private static void jobValidate(Configuration allConfig) {
return;
}
}

其实毫无校验逻辑,比较敷衍。

6.启动任务

1
2
Engine engine = new Engine();
engine.start(configuration);

任务执行流程

此时程序进入到了Engine.start(configuration)的执行流程,在这一步中经历以下环节:

1.绑定Column转换格式,这一步会在配置中指定以下信息:

字符串转换格式

日期转换格式

字节编码格式

1
2
3
4
public void start(Configuration allConf) {

// 绑定column转换信息
ColumnCast.bind(allConf);

2.初始化PluginLoader

PluginLoader可以理解是所有数据同步插件的统一加载器,在这一步中实际上是将当前的任务配置赋值给了PluginLoader一份:

1
2
3
4
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);

3.实例化任务容器

判断配置中core.container.model是不是taskGroup。

如果是taskGroup实例化TaskGroupContainer,如果不是,实例化JobContainer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber =0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

} else {
container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
taskGroupId = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
channelNumber = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
}

4.初始化性能追踪器,使用单例模式,一个jvm进程中只存在一个PerfTrace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);

/**
* 单实例
*
* @param isJob
* @param jobId
* @param taskGroupId
* @return
*/
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) {

if (instance == null) {
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(isJob, jobId, taskGroupId, enable);
}
}
}
return instance;
}

5.启动第3步实例化好的任务容器

1
container.start();

任务容器执行流程

任务容器这里是一个抽象类,实现类有JobContainer和TaskGroupContainer。

任务容器被启动后,会执行任务生命周期的每一个阶段

  1. preHandle:用户可配置任务处理前的前置处理逻辑,这个前置处理逻辑处于writer或者reader中

  2. init:任务初始化阶段,初始化reader和writer(使用自定义的classLoader进行类加载),调用reader和writer的init()方法

  3. prepare:任务准备阶段,实际上调用reader和writer的prepare()方法

  4. split:任务分片,调整channel数,其实也就是任务的并发数

    • 调整channel数量

    ​ 判断是否设置了字节限制job.setting.speed.byte,如果是,根据字节限制配置计算channel数量

    ​ 判断是否设置了条数限制job.setting.speed.record,如果是,根据条数限制配置计算channel数量

    ​ 取以上1 2步的最小值。

    ​ 如果1 2步都没有设置,取配置中的job.setting.speed.channel来确定channel数量

    • 根据channel数量切分configuration,包括readerConfig和writerConfig,切分逻辑在每个插件中具体实现的,在切分好配置之后会将配置覆盖进 job.content
  5. schedule:任务调度执行阶段,根据第四步中确定的并发数进行任务调度工作

    • 根据上面的split中算出的channel数,设置taskGroup的数量,默认一个taskGroup中运行5个channel,切分configuration,为每个taskGroup创建自己的configuration

    • 开始对taskGroupConfigurations进行调度,底层使用线程池,通过taskGroup的数量来确定线程池的线程个数

    • 启动线程,TaskGroupContainerRunner,每一个TaskGroupContainerRunner中包含了一个TaskGroupContainer,实际上执行任务核心逻辑的容器是TaskGroupContainer,TaskGroupContainer会为每个channel创建TaskExecutor,TaskExecutor是执行子任务的最小单位

    • while循环监控taskGroup运行状态,等待每个子任务完成和数据情况的上报,并收集

  6. post:任务执行结束后阶段

  7. postHandler:任务结束后后置处理逻辑,这个后置处理逻辑处于writer或者reader中

  8. invokeHooks:DataX预留的spi接口,用户可自定义spi插件来丰富整个同步任务的生命周期

总结

DataX启动路径为:

  • 启动类Engine.java-> entry() |根据mode初始化 AbstarctContainer(JobContainer/TaskGroupContainer)

  • 启动容器start()

    • JobContainer容器会执行preHander()、init()、prepare()、split()、scheduler()、post()、postHandle()、invokeHooks(),它是所有任务的master,负责任务的初始化、拆分、调度、监控和回报,但并不做实际的数据同步

      • scheduler():startAllTaskGroup,启动所有的taskgroup。
    • 线程启动后,会启动TaskGroupContainer来运行一个taskgroup里的全部任务。

      • 根据通道数构建TaskExecutor加入执行队列。

      • TaskExecute是TaskGroupContainer的内部类,是一个基本单位task的具体执行管理的地方。

        初始化一些信息,比如初始化读写线程,实例化存储读数据的管道,获取transformer的参数等。
        初始化之后开启读写线程,正式开始单个task(一部分数据同步任务)正式启动。

        读操作(ReaderRunner)利用jdbc,把从数据库中读出来的每条数据封装为一个个Record放入Channel中,当数据读完时,结束的时候会写入一个TerminateRecord标识。

        写操作(WriterRunner)不断从Channel中读取Record,直到读到TerminateRecord标识数据以取完,把数据全部读入数据库中

Reference:

https://juejin.cn/post/6869755803570536461?from=search-suggest

https://blog.csdn.net/qq_20042935/article/details/122584478


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

DataX源码解析-01整体架构
http://yuting0907.github.io/posts/2025/09/fc450966.html
作者
Echo Yu
发布于
2025年9月1日
许可协议