DataX源码分析 reader

DataX源码分析 reader本文详细解读了 DataX 框架中的 Reader 组件 阐述了其如何通过插件化设计处理不同数据源 包括源码分析步骤 配置文件解析 动态加载以及数据处理流程

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
八、DataX源码分析-插件机制



前言

DataX的Reader组件负责从数据源中读取数据,并将这些数据转换成DataX框架可以处理的数据格式。DataX的Reader组件采用了插件化的设计,使得添加新的数据源类型变得相对容易。只需要实现相应的Reader接口或抽象类,并提供必要的配置参数,就可以将新的数据源集成到DataX框架中。这种可扩展性使得DataX能够适应不断变化的数据环境。Reader通常与特定的数据源绑定,每种数据源类型可能都需要一个独立的Reader实现。

以下是一个简化的源码分析步骤,以DataX的MySQLReader为例:


Reader组件如何处理各类数据源

DataX的Reader组件处理不同的数据源类型主要是通过抽象和扩展的机制来实现的。具体来说,DataX框架为每种数据源类型定义了一个Reader接口或抽象类,并为每种具体的数据源实现了相应的Reader类。

以下是DataX的Reader组件如何处理不同数据源类型的基本步骤:

通过以上步骤,DataX的Reader组件能够灵活处理不同类型的数据源,并实现了数据从数据源到DataX框架的顺畅传输。同时,这种抽象和扩展的机制也使得DataX框架易于扩展,可以方便地添加对新数据源类型的支持。

源码

 / * 每个Reader插件在其内部内部实现Job、Task两个内部类。 * * * */ public abstract class Reader extends BaseObject { 
    / * 每个Reader插件必须实现Job内部类。 * * */ public static abstract class Job extends AbstractJobPlugin { 
    / * 切分任务 * * @param adviceNumber * * 着重说明下,adviceNumber是框架建议插件切分的任务数,插件开发人员最好切分出来的任务数>= * adviceNumber。<br> * <br> * 之所以采取这个建议是为了给用户最好的实现,例如框架根据计算认为用户数据存储可以支持100个并发连接, * 并且用户认为需要100个并发。 此时,插件开发人员如果能够根据上述切分规则进行切分并做到>=100连接信息, * DataX就可以同时启动100个Channel,这样给用户最好的吞吐量 <br> * 例如用户同步一张Mysql单表,但是认为可以到10并发吞吐量,插件开发人员最好对该表进行切分,比如使用主键范围切分, * 并且如果最终切分任务数到>=10,我们就可以提供给用户最大的吞吐量。 <br> * <br> * 当然,我们这里只是提供一个建议值,Reader插件可以按照自己规则切分。但是我们更建议按照框架提供的建议值来切分。 <br> * <br> * 对于ODPS写入OTS而言,如果存在预排序预切分问题,这样就可能只能按照分区信息切分,无法更细粒度切分, * 这类情况只能按照源头物理信息切分规则切分。 <br> * <br> * * * */ public abstract List<Configuration> split(int adviceNumber); } public static abstract class Task extends AbstractTaskPlugin { 
    public abstract void startRead(RecordSender recordSender); } } 
 public class MysqlReader extends Reader { 
    private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql; public static class Job extends Reader.Job { 
    private static final Logger LOG = LoggerFactory .getLogger(Job.class); private Configuration originalConfig = null; private CommonRdbmsReader.Job commonRdbmsReaderJob; @Override public void init() { 
    this.originalConfig = super.getPluginJobConf(); Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE); if (userConfigedFetchSize != null) { 
    LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置."); } this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE); this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE); this.commonRdbmsReaderJob.init(this.originalConfig); } @Override public void preCheck(){ 
    init(); this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE); } @Override public List<Configuration> split(int adviceNumber) { 
    return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber); } @Override public void post() { 
    this.commonRdbmsReaderJob.post(this.originalConfig); } @Override public void destroy() { 
    this.commonRdbmsReaderJob.destroy(this.originalConfig); } } public static class Task extends Reader.Task { 
    private Configuration readerSliceConfig; private CommonRdbmsReader.Task commonRdbmsReaderTask; @Override public void init() { 
    this.readerSliceConfig = super.getPluginJobConf(); this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId()); this.commonRdbmsReaderTask.init(this.readerSliceConfig); } @Override public void startRead(RecordSender recordSender) { 
    int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE); this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, super.getTaskPluginCollector(), fetchSize); } @Override public void post() { 
    this.commonRdbmsReaderTask.post(this.readerSliceConfig); } @Override public void destroy() { 
    this.commonRdbmsReaderTask.destroy(this.readerSliceConfig); } } } 
 public class RdbmsReader extends Reader { 
    private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; static { 
    //加载插件下面配置的驱动类 DBUtil.loadDriverClass("reader", "rdbms"); } public static class Job extends Reader.Job { 
    private Configuration originalConfig; private CommonRdbmsReader.Job commonRdbmsReaderMaster; @Override public void init() { 
    this.originalConfig = super.getPluginJobConf(); int fetchSize = this.originalConfig.getInt( com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, Constant.DEFAULT_FETCH_SIZE); if (fetchSize < 1) { 
    throw DataXException .asDataXException( DBUtilErrorCode.REQUIRED_VALUE, String.format( "您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.", fetchSize)); } this.originalConfig.set( com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, fetchSize); this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job( DATABASE_TYPE); this.commonRdbmsReaderMaster.init(this.originalConfig); } @Override public List<Configuration> split(int adviceNumber) { 
    return this.commonRdbmsReaderMaster.split(this.originalConfig, adviceNumber); } @Override public void post() { 
    this.commonRdbmsReaderMaster.post(this.originalConfig); } @Override public void destroy() { 
    this.commonRdbmsReaderMaster.destroy(this.originalConfig); } } public static class Task extends Reader.Task { 
    private Configuration readerSliceConfig; private CommonRdbmsReader.Task commonRdbmsReaderSlave; @Override public void init() { 
    this.readerSliceConfig = super.getPluginJobConf(); this.commonRdbmsReaderSlave = new SubCommonRdbmsReader.Task( DATABASE_TYPE); this.commonRdbmsReaderSlave.init(this.readerSliceConfig); } @Override public void startRead(RecordSender recordSender) { 
    int fetchSize = this.readerSliceConfig .getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE); this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig, recordSender, super.getTaskPluginCollector(), fetchSize); } @Override public void post() { 
    this.commonRdbmsReaderSlave.post(this.readerSliceConfig); } @Override public void destroy() { 
    this.commonRdbmsReaderSlave.destroy(this.readerSliceConfig); } } } 
今天的文章 DataX源码分析 reader分享到此就结束了,感谢您的阅读。
编程小号
上一篇 2025-01-03 13:33
下一篇 2025-01-03 13:30

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/100577.html