DataX源码解析-03数据传输

前言

在上篇文章中我们已经对于DataX的调度流程进行了细致的剖析,这篇文章我们将更深层次的研究DataX在数据传输与交换方面的细节。

上篇提到,DataX核心运行子单位是TaskExecutor,一个TaskExecutor中会拥有两个线程,分别是WriterThread和ReaderThread,这两个线程承担着整个数据传输的重任,所以今天整篇文章的重点将围绕这两个线程展开。

线程的创建

来到TaskGroupContainer源码中,找到TaskExecutor新建WriterThread和ReaderThread的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 生成writerThread
*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer",
jobId, taskGroupId, this.taskId));
//通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME)));

/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader",
jobId, taskGroupId, this.taskId));
/**
* 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME)));

WriterRunner与ReaderRunner

  • 1.WriterRunner继承Runnable接口,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class WriterRunner extends AbstractRunner implements Runnable {

private static final Logger LOG = LoggerFactory
.getLogger(WriterRunner.class);

private RecordReceiver recordReceiver;

public void setRecordReceiver(RecordReceiver receiver) {
this.recordReceiver = receiver;
}

public WriterRunner(AbstractTaskPlugin abstractTaskPlugin) {
super(abstractTaskPlugin);
}

@Override
public void run() {
Validate.isTrue(this.recordReceiver != null);

Writer.Task taskWriter = (Writer.Task) this.getPlugin();
//统计waitReadTime,并且在finally end
PerfRecord channelWaitRead = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_READ_TIME);
try {
channelWaitRead.start();
LOG.debug("task writer starts to do init ...");
PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_INIT);
initPerfRecord.start();
taskWriter.init();
initPerfRecord.end();

LOG.debug("task writer starts to do prepare ...");
PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_PREPARE);
preparePerfRecord.start();
taskWriter.prepare();
preparePerfRecord.end();
LOG.debug("task writer starts to write ...");

PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DATA);
dataPerfRecord.start();
taskWriter.startWrite(recordReceiver);

dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
dataPerfRecord.end();

LOG.debug("task writer starts to do post ...");
PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_POST);
postPerfRecord.start();
taskWriter.post();
postPerfRecord.end();

在WriterRunner核心run方法中,主要进行了对Writer插件各个生命周期的调用和每个阶段的耗时统计,但最重要的是我们发现了WriterRunner开始写数据的入口:

1
taskWriter.startWrite(recordReceiver);

对于WriterRunner取数据然后再写数据的媒介是这个神秘的recordReceiver,在上面创建线程的同时我们也发现了有代码会设置recordReceiver:

1
2
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
this.channel, pluginCollector));
  • 2.ReaderRunner继承Runnable接口,

    在ReaderRunner核心run方法中,主要进行了对Reader插件各个生命周期的调用和每个阶段的耗时统计,但最重要的是我们发现了ReaderRunner开始读数据的入口:

    1
    taskReader.startRead(recordSender);

    对于ReaderThread写数据的媒介是这个神秘的recordSender,在上面创建线程的同时我们也发现了有代码会设置recordSender:

    1
    ((ReaderRunner) newRunner).setRecordSender(recordSender);

    综上所述,读线程的读操作核心依赖RecordSender。

读线程和写线程各自拥有着对应的内存交换模型去交换数据,所以接下来的研究核心将转向RecorderReceiver和RecordSender。

BufferedRecordExchanger

  • 打开RecordReceiver的源码,发现它是个接口,实际上实现形式有三种,从字面命名可以看出

有1对1交换实现,还有1对多缓存交换实现,在实际DataX代码中为提高性能使用的是BufferedRecordExchanger;

1
2
3
4
public interface RecordReceiver {
public Record getFromReader();
public void shutdown();
}

  • 和RecordReceiver一致,同样RecordSender也是一个接口,实际上实现形式和RecordSender一致,在实际DataX代码中为提高性能使用的是BufferedRecordExchanger:
1
2
3
4
5
6
7
8
9
10
11
12
public interface RecordSender {

public Record createRecord();

public void sendToWriter(Record record);

public void flush();

public void terminate();

public void shutdown();
}

  • BufferedRecordExchanger实现了对应两个接口,

    在类中我们发现了之前提过的Channel内存模型对象,通过Channel内存模型对象在RecordSender和RecordReceiver之间交换数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @Override
    public void sendToWriter(Record record) {
    if(shutdown){
    throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
    }

    Validate.notNull(record, "record不能为空.");

    if (record.getMemorySize() > this.byteCapacity) {
    this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
    return;
    }

    boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
    if (isFull) {
    flush();
    }

    this.buffer.add(record);
    this.bufferIndex++;
    memoryBytes.addAndGet(record.getMemorySize());
    }

    @Override
    public void flush() {
    if(shutdown){
    throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
    }
    this.channel.pushAll(this.buffer);
    this.buffer.clear();
    this.bufferIndex = 0;
    this.memoryBytes.set(0);
    }

    发送过程逻辑很简单,一个很一般的buffer思路,生成数据先写入buffer,buffer满了统一写入到channel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Override
    public Record getFromReader() {
    if(shutdown){
    throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
    }
    boolean isEmpty = (this.bufferIndex >= this.buffer.size());
    if (isEmpty) {
    receive();
    }

    Record record = this.buffer.get(this.bufferIndex++);
    if (record instanceof TerminateRecord) {
    record = null;
    }
    return record;
    }


    private void receive() {
    this.channel.pullAll(this.buffer);
    this.bufferIndex = 0;
    this.bufferSize = this.buffer.size();
    }

    读取过程逻辑同样很简单,先从buffer读,buffer空了从channel中再次读取

Channel

由上文可知,Channel是数据存储的基本单位,用户可以根据不同需求去自定义实现这个规范:

内存模型里定义了统计限速行为以及数据推拉行为,定义了核心的消费者生产者模型,在DataX源码中,目前开源了的只有一种Channel的模型实现:

MemoryChannel

比较核心的两个方法是doPush和doPull:

1
2
3
4
5
6
7
8
9
10
11
@Override
protected void doPush(Record r) {
try {
long startTime = System.nanoTime();
this.queue.put(r);
waitWriterTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(r.getMemorySize());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected Record doPull() {
try {
long startTime = System.nanoTime();
Record r = this.queue.take();
waitReaderTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(-r.getMemorySize());
return r;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

由源码可知,doPull和doPush方法主要是通过queue对象进行数据的交换,实际上queue底层的实现是ArrayBlockQueue,push数据是调用queue的take方法,至此,整个DataX数据交换流程结束。

总结

本篇文章从更深层的角度分析了Reader和Writer插件之间的数据交换流程和原理,DataX实现并发数据传输和交换总结起来有特点如下:

1.利用同一个抽象内存模型协调生产者和消费者之间的关系(ArrayBlockQueue生产者消费者模型)
2.使用多线程实现读写异步执行
3.合理利用缓存理论提高数据传输的性能


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

DataX源码解析-03数据传输
http://yuting0907.github.io/posts/2025/09/e90c287d.html
作者
Echo Yu
发布于
2025年9月1日
许可协议