DataX源码解析-02调度流程

前言

上篇讲了DataX的整体架构,已经对DataX的架构和运行流程有了一个比较细致的了解。这篇主要集中于DataX在调度方面的深层细节。回顾DataX的执行路径为:

  • 启动类Engine.java-> entry() |根据mode初始化 AbstarctContainer(JobContainer/TaskGroupContainer)

  • 启动容器start()

    • JobContainer容器会执行preHander()、init()、prepare()、split()、scheduler()、post()、postHandle()、invokeHooks(),它是所有任务的master,负责任务的初始化、拆分、调度、监控和回报,但并不做实际的数据同步

      • scheduler():startAllTaskGroup,启动所有的taskgroup。
    • 线程启动后,会启动TaskGroupContainer来运行一个taskgroup里的全部任务。

      • 根据通道数构建TaskExecutor加入执行队列。

      • TaskExecute是TaskGroupContainer的内部类,是一个基本单位task的具体执行管理的地方。

        初始化一些信息,比如初始化读写线程,实例化存储读数据的管道,获取transformer的参数等。
        初始化之后开启读写线程,正式开始单个task(一部分数据同步任务)正式启动。

        读操作(ReaderRunner)利用jdbc,把从数据库中读出来的每条数据封装为一个个Record放入Channel中,当数据读完时,结束的时候会写入一个TerminateRecord标识。

        写操作(WriterRunner)不断从Channel中读取Record,直到读到TerminateRecord标识数据以取完,把数据全部读入数据库中

调度流程解析

JobContainer的scheduler方法中:

1.计算需要的channel个数

1
2
3
4
5
6
7
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskNumber = this.configuration.getList(
CoreConstant.DATAX_JOB_CONTENT).size();

this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
PerfTrace.getInstance().setChannelNumber(needChannelNumber);

2.计算TaskGroup的个数

任务的分配是由JobAssignUtil去进行,assignFairly方法分配的逻辑是公平分配,使用的Round Robin算法,轮询分配到每个TaskGroup中,在计算出真正需要的channel数量之后,根据每个TaskGroup应该被分配任务的个数,计算TaskGroup的个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);

LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());

ExecuteMode executeMode = null;
AbstractScheduler scheduler;
try {
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);

//设置 executeMode
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}

if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
if (this.jobId <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
}
}

LOG.info("Running by {} Mode.", executeMode);

this.startTransferTimeStamp = System.currentTimeMillis();

scheduler.schedule(taskGroupConfigs);

3.启动调度

  • AbstractScheduler schedule() 抽象类
1
startAllTaskGroup(configurations);
  • ProcessInnerScheduler extends AbstractScheduler 实现抽象类 startAllTaskGroup()

    DataX底层对于每个taskGroup都启动了一个线程TaskGroupContainerRunner,采用线程池的方式实现并发操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private ExecutorService taskGroupContainerExecutorService;

@Override
public void startAllTaskGroup(List<Configuration> configurations) {
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());

for (Configuration taskGroupConfiguration : configurations) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}

this.taskGroupContainerExecutorService.shutdown();
}
  • TaskGroupContainerRunner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TaskGroupContainerRunner implements Runnable {

private TaskGroupContainer taskGroupContainer;

private State state;

public TaskGroupContainerRunner(TaskGroupContainer taskGroup) {
this.taskGroupContainer = taskGroup;
this.state = State.SUCCEEDED;
}

@Override
public void run() {
try {
Thread.currentThread().setName(
String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
}

taskGroupContainer的start方法中:

1.启动task

每个子任务也就是最小的并发单位的执行器是TaskExecutor, TaskExecutor为taskGroupContainer的内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();

iterator.remove();
runTasks.add(taskExecutor);

//上面,增加task到runTasks列表,因此在monitor里注册。
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));

taskFailedExecutorMap.remove(taskId);
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
this.taskGroupId, taskId, attemptCount);

2.taskExecutor.doStart();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void doStart() {
this.writerThread.start();

// reader没有起来,writer不可能结束
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}

this.readerThread.start();

// 这里reader可能很快结束
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
// 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}

}

从这个类里面可以看见,做核心传输逻辑的变量是两个线程,一个写的线程,一个读的线程,在doStart方法中,启动了这两个线程,到这里整个任务被完全启动了起来。

总结

DataX整个调度依赖于java底层线程池,任务调度包括如下步骤:

1.根据流控和并发配置确定分片数量

2.根据分片的数量确定TaskGroupContainer个数

3.根据轮询的公平调度原则分配task给TaskGroupContainer

4.每个TaskGroupContainer包含多个TaskExecutor

5.TaskExecutor启动ReaderThread和WriterThread

下篇将详细解析TaskExecutor中的ReaderThread和WriterThread之间如何交换数据!


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

DataX源码解析-02调度流程
http://yuting0907.github.io/posts/2025/09/70307113.html
作者
Echo Yu
发布于
2025年9月1日
许可协议