2025年datax(25)-插件加载原理

datax(25)-插件加载原理一 插件分类 按照功能分 reader 读插件 例如 mysqlReader 从 mysql 读取数据 writer 写插件 例如 mysqlWriter 给 mysql 写入数据 transformer 中间结果转换 例如 SubstrTransf 用于字符截取 handler 主要用于任务执行前的准备工作和完成的收尾工作 插件类型由 PluginType 枚举表示 public enum

一、插件分类

按照功能分

reader, 读插件,例如mysqlReader,从mysql读取数据

writer, 写插件。例如mysqlWriter,给mysql写入数据;

transformer, 中间结果转换,例如SubstrTransformer用于字符截取;

handler, 主要用于任务执行前的准备工作和完成的收尾工作。

插件类型由PluginType枚举表示

public enum PluginType { 

READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");
}

按照运行类型

Job级别的插件

Task级别的插件

----

二、插件目录结构

大目录:xxx\DataX\target\datax\datax\plugin下分2个reader和writer目录,下面以mysql为例


如上图,可以看到

libs文件夹主要放该插件运行所依赖的jars

xxx-xxx.jar即本插件最后打成的包

plugin.json里面是本插件的元数据信息(名称,主类,描述信息,开发者),具体如下

{ 

"name": "mysqlreader",
"class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba"
}

plugin_job_template.json是本插件的一个模板JSON,加载时候会根据该模板里的参数校验用户的入参json

----

三、插件加载原理

1. 配置job信息,获取所有插件名称

给ConfigParser.parse(final String jobPath)传入job路径,该方法组装解析,最后返回一个conf对象,conf里解析出了reader,writer,handler等插件名称;

public static Configuration parse(final String jobPath) { 

Configuration configuration = parseJobConfig(jobPath);
configuration.merge(parseCoreConfig(CoreConstant.DATAX_CONF_PATH), false);

// todo config优化,只捕获需要的plugin
String readerPluginName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
String writerPluginName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

String preHandlerName = configuration.getString(CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
String postHandlerName = configuration.getString(CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

Set pluginList = new HashSet<>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);

if (StringUtils.isNotEmpty(preHandlerName)) {

pluginList.add(preHandlerName);
}
if (StringUtils.isNotEmpty(postHandlerName)) {

pluginList.add(postHandlerName);
}
try {

configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false);
} catch (Exception e) {

//吞掉异常,保持log干净。这里message足够。
LOG.warn(String
.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName,
e.getMessage()));
try {

Thread.sleep(1000);
} catch (InterruptedException e1) {

//
}
configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false);
}
return configuration;
}
2. 根据插件名称获取插件配置(plugin.json)

上一步获取了很多插件信息,本步骤根据插件名称和datax的规范,从目录中获取每个插件的详细信息;

public static Configuration parsePluginConfig(List wantPluginNames) { 

Configuration configuration = Configuration.newDefault();

Set replicaCheckPluginSet = new HashSet<>();
int complete = 0;
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {

Configuration eachReaderConfig = ConfigParser
.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
if (eachReaderConfig != null) {

configuration.merge(eachReaderConfig, true);
complete += 1;
}
}

for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {

Configuration eachWriterConfig = ConfigParser
.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
if (eachWriterConfig != null) {

configuration.merge(eachWriterConfig, true);
complete += 1;
}
}
if (wantPluginNames != null && wantPluginNames.size() > 0
&& wantPluginNames.size() != complete) {

throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,
"插件加载失败,未完成指定插件加载:" + wantPluginNames);
}
return configuration;
}
3.动态加载插件
插件的加载都是使用ClassLoader动态加载。 为了避免类的冲突,datax通过自定义类加载器JarLoader,提供Jar隔离的加载机制。

加载器由JarLoader实现

插件的加载接口由LoadUtil类负责

当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。这个主要由ClassLoaderSwapper实现

3.1 JarLoader 类

JarLoader继承URLClassLoader,扩充了可以加载目录的功能。可以从指定的目录下,把传入的路径、及其子路径、以及路径中的jar文件加入到class path。

/** * 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。 */
public class JarLoader extends URLClassLoader {


public JarLoader(String[] paths) {

this(paths, JarLoader.class.getClassLoader());
}

public JarLoader(String[] paths, ClassLoader parent) {

super(getURLs(paths), parent);
}

private static URL[] getURLs(String[] paths) {

Validate.isTrue(null != paths && 0 != paths.length, "jar包路径不能为空.");

List dirs = new ArrayList();
for (String path : paths) {

dirs.add(path);
JarLoader.collectDirs(path, dirs);
}

List urls = new ArrayList();
for (String path : dirs) {

urls.addAll(doGetURLs(path));
}

return urls.toArray(new URL[0]);
}

private static void collectDirs(String path, List collector) {

if (null == path || StringUtils.isBlank(path)) {

return;
}

File current = new File(path);
if (!current.exists() || !current.isDirectory()) {

return;
}

for (File child : current.listFiles()) {

if (!child.isDirectory()) {

continue;
}

collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}

private static List doGetURLs(final String path) {

Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");
File jarPath = new File(path);
Validate.isTrue(jarPath.exists() && jarPath.isDirectory(), "jar包路径必须存在且为目录.");
/* set filter */
FileFilter jarFilter = pathname -> pathname.getName().endsWith(".jar");

/* iterate all jar */
File[] allJars = new File(path).listFiles(jarFilter);
List jarURLs = new ArrayList<>(allJars.length);

for (int i = 0; i < allJars.length; i++) {

try {

jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {

throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INIT_ERROR, "系统加载jar包出错", e);
}
}
return jarURLs;
}
}
3.2 LoadUtil 类

LoadUtil管理着插件的加载器,调用getJarLoader返回插件对应的加载器。

/** * Created by jingxing on 14-8-24. * 

* 插件加载器,大体上分reader、transformer(还未实现)和writer三种插件类型, reader和writer在执行时又可能出现Job和Task两种运行时(加载的类不同) */
public class LoadUtil {


private static final String pluginTypeNameFormat = "plugin.%s.%s";

private LoadUtil() {

}

private enum ContainerType {

Job("Job"),
Task("Task");

private String type;

ContainerType(String type) {

this.type = type;
}

public String value() {

return type;
}
}

/** * 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别 * 具体pluginName,故使用pluginType.pluginName作为key放置在该map中 */
private static Configuration pluginRegisterCenter;

/** * jarLoader的缓冲 */
private static Map jarLoaderCenter = new HashMap<>();

/** * 设置pluginConfigs,方便后面插件来获取 初始化PluginLoader,可以获取各种插件配置 * * @param pluginConfigs */
public static void bind(Configuration pluginConfigs) {

pluginRegisterCenter = pluginConfigs;
}

/** * 根据插件类型+插件名称,生成一个 字符串。插件中心根据该字符串找到对应插件 * * @param pluginType PluginType * @param pluginName String * @return String */
private static String generatePluginKey(PluginType pluginType, String pluginName) {

return String.format(pluginTypeNameFormat, pluginType.toString(), pluginName);
}

/** * 根据插件类型和插件名称,获取配置;
1 根据 插件类型+插件名称,返回string ;
2 从 pluginRegisterCenter 中根据 * string获取配置 * * @param pluginType * @param pluginName * @return */
private static Configuration getPluginConf(PluginType pluginType, String pluginName) {

Configuration pluginConf = pluginRegisterCenter
.getConfiguration(generatePluginKey(pluginType, pluginName));

if (null == pluginConf) {

throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
String.format("DataX不能找到插件[%s]的配置.", pluginName));
}

return pluginConf;
}

/** * 根据反射使用插件类型+插件名称 返回 插件。加载JobPlugin,reader、writer都可能要加载 * * @param type PluginType * @param name String * @return AbstractJobPlugin */
public static AbstractJobPlugin loadJobPlugin(PluginType type, String name) {

Class clazz = loadPluginClass(type, name, ContainerType.Job);

try {

AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();
jobPlugin.setPluginConf(getPluginConf(type, name));
return jobPlugin;
} catch (Exception e) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX找到plugin[%s]的Job配置.", name), e);
}
}

/** * 原理类同上面loadJobPlugin 方法。加载taskPlugin,reader、writer都可能加载 * * @param type PluginType * @param name String * @return AbstractTaskPlugin */
public static AbstractTaskPlugin loadTaskPlugin(PluginType type, String name) {

Class clz = LoadUtil.loadPluginClass(type, name, ContainerType.Task);

try {

AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clz.newInstance();
taskPlugin.setPluginConf(getPluginConf(type, name));
return taskPlugin;
} catch (Exception e) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX不能找plugin[%s]的Task配置.", name), e);
}
}

/** * 根据插件类型、名字和执行时taskGroupId加载对应运行器 * * @param pluginType * @param pluginName * @return */
public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {

AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType, pluginName);

switch (pluginType) {

case READER:
return new ReaderRunner(taskPlugin);
case WRITER:
return new WriterRunner(taskPlugin);
default:
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
String.format("插件[%s]的类型必须是[reader]或[writer]!", pluginName));
}
}

/** * 反射出具体plugin实例 * * @param pluginType * @param pluginName * @param pluginRunType * @return */
@SuppressWarnings("unchecked")
private static synchronized Class loadPluginClass(PluginType pluginType,
String pluginName, ContainerType pluginRunType) {

Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {

return (Class) jarLoader
.loadClass(pluginConf.getString("class") + "$" + pluginRunType.value());
} catch (Exception e) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}

public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName) {

Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, pluginName));
if (null == jarLoader) {

String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
String.format("%s插件[%s]路径非法!", pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{
pluginPath});
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName), jarLoader);
}
return jarLoader;
}
}

3.3 ClassLoaderSwapper

切换类加载器

/** * Created by jingxing on 14-8-29. * * 为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer * 就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码 */
public final class ClassLoaderSwapper {

private ClassLoader storeClassLoader = null;

private ClassLoaderSwapper() {

}

public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {

return new ClassLoaderSwapper();
}

/** * 保存当前classLoader,并将当前线程的classLoader设置为 传入的classLoader(实现classLoader互换) * * @param * @return */
public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {

// 1 将当前线程的 classLoader 先保持到 本类的 storeClassLoader 中
this.storeClassLoader = Thread.currentThread().getContextClassLoader();
// 2 将当前线程的 classLoader 设置为 传入的 classLoader
Thread.currentThread().setContextClassLoader(classLoader);
return this.storeClassLoader;
}

/** * 将当前线程的类加载器设置为保存的类加载 * @return */
public ClassLoader restoreCurrentThreadClassLoader() {

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.storeClassLoader);
return classLoader;
}
}
编程小号
上一篇 2025-06-26 15:01
下一篇 2025-09-01 17:27

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/hz/112740.html