DataX源码解析-04插件加载原理

前言

前面几篇的DataX系列源码解析,分别讲了整体架构、调度流程、数据传输,这篇将详细介绍DataX的各类插件是如何进行家加载的。

JobContainer.start()

在JobContainer中,进入start()方法的this.init()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* reader和writer的初始化
*/
private void init() {
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);

if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}

Thread.currentThread().setName("job-" + this.jobId);

JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
}

进入this.jobReader = this.initJobReader(jobPluginCollector);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private Reader.Job initJobReader(
JobPluginCollector jobPluginCollector) {
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));

Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);

// 设置reader的jobConfig
jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

// 设置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init();

classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
}
  • 首先通过配置文件获取插件的名称
  • 保存当前classLoader,并将当前线程的classLoader设置为所给对应的JarLoader
  • 加载Reader插件的实现类
  • 初始化Reader的参数
  • 执行jobReader的init方法
  • 将当前线程的类加载器设置为保存的类加载,恢复之前的线程上下文加载器

LoadUtil.loadJobPlugin()

com.alibaba.datax.core.util.container.LoadUtil#loadJobPlugin

通过 Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);实例化对应的插件类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 加载JobPlugin,reader、writer都可能要加载
*
* @param pluginType
* @param pluginName
* @return
*/
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
String pluginName) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);

try {
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
.newInstance();
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return jobPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX找到plugin[%s]的Job配置.",
pluginName), e);
}
}

LoadUtil.loadPluginClass()

com.alibaba.datax.core.util.container.LoadUtil#loadPluginClass

这里获取到JarLoader,通过JarLoader的loadClass方法加载我们plugin.json配置的class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 反射出具体plugin实例
*
* @param pluginType
* @param pluginName
* @param pluginRunType
* @return
*/
@SuppressWarnings("unchecked")
private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {
return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
+ pluginRunType.value());
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}

LoadUtil.getJarLoader()

com.alibaba.datax.core.util.container.LoadUtil#getJarLoader

根据类型和名称从缓存中获取,如果没有则去创建,首先获取插件的路径.比如:”path”: “D:\DataX\target\datax\datax\plugin\reader\mysqlreader”
然后根据JarLoader里面的getURLs(paths)获取插件路径下所有的jar包。
创建单独的JarLoader,把创建的JarLoader缓存起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);

JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format(
"%s插件[%s]路径非法!",
pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{pluginPath});
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}

return jarLoader;
}

自定义类加载器JarLoader

DataX通过自定义类加载器JarLoader,提供Jar隔离的加载机制。

JarLoader继承URLClassLoader,扩充了可以加载目录的功能。可以从指定的目录下,把传入的路径、及其子路径、以及路径中的jar文件加入到class path下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

/**
* 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
*/
public class JarLoader extends URLClassLoader {
public JarLoader(String[] paths) {
this(paths, JarLoader.class.getClassLoader());
}

public JarLoader(String[] paths, ClassLoader parent) {
super(getURLs(paths), parent);
}

private static URL[] getURLs(String[] paths) {
Validate.isTrue(null != paths && 0 != paths.length,
"jar包路径不能为空.");

List<String> dirs = new ArrayList<String>();
for (String path : paths) {
dirs.add(path);
JarLoader.collectDirs(path, dirs);
}

List<URL> urls = new ArrayList<URL>();
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}

return urls.toArray(new URL[0]);
}

private static void collectDirs(String path, List<String> collector) {
if (null == path || StringUtils.isBlank(path)) {
return;
}

File current = new File(path);
if (!current.exists() || !current.isDirectory()) {
return;
}

for (File child : current.listFiles()) {
if (!child.isDirectory()) {
continue;
}

collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}

private static List<URL> doGetURLs(final String path) {
Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");

File jarPath = new File(path);

Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
"jar包路径必须存在且为目录.");

/* set filter */
FileFilter jarFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar");
}
};

/* iterate all jar */
File[] allJars = new File(path).listFiles(jarFilter);
List<URL> jarURLs = new ArrayList<URL>(allJars.length);

for (int i = 0; i < allJars.length; i++) {
try {
jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INIT_ERROR,
"系统加载jar包出错", e);
}
}

return jarURLs;
}
}


总结

DataX使用纯java编程,没有使用任何框架,其中编程思想和面向对象设计思路很值得学习。


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

DataX源码解析-04插件加载原理
http://yuting0907.github.io/posts/2025/09/cf5c40b3.html
作者
Echo Yu
发布于
2025年9月2日
许可协议