数据传输支持实时同步任务的创建、管理。
版本:2.0.1,认证方式:Kerberos认证
版本:2.3.1,认证方式:无认证(仅支持读)
版本:2.3.1,认证方式:SASL_Plaintext(仅支持读)
版本:2.7.1,认证方式:无认证(仅支持读)
版本:2.7.1,认证方式:SASL_Plaintext(仅支持读)
功能入口: 在数据传输页面,选择左侧导航栏“实时同步任务”,点击“新建任务”进入任务创建页面。
选择数据来源
首先配置数据来源,在选择数据来源环节配置任务基本信息和选择数据来源。
任务基本信息

注意:同一项目-集群下不允许存在同名任务
1)任务导入导出时对库名、Topic名等进行替换。
2)任务常用的高级配置-自定义参数配置为参数组,可实现不同任务间的自定义参数复用。
功能使用注意事项:
1)同一实时同步任务内多个参数组内有相同参数项时,系统取排在前面的参数组的参数值。
2)如果字段映射支持的内置变量与参数组参数冲突时,内置变量优先级更高。
3) 如果高级设置填写的参数与参数组参数冲突时,则高级设置填写的参数优先级更高。
4)如果导入引用参数组的实时同步任务时,会检测导入端是否存在同名参数组,如不存在则检测不通过,导入失败。如导入端存在同名参数组,则在导入端会将任务引用的参数组id替换为导入端同名参数组的id。
功能详细使用步骤:
数据传输-实时同步任务:“引用参数组”配置项选取需引用的参数组。
1)如参数组的使用场景是:任务常用的高级配置-自定义参数配置为参数组,可实现不同任务间的自定义参数复用,则“引用参数组”配置项选取需引用的参数组即可。
2)如参数组的使用场景是:任务导入导出时对库名、Topic名等进行替换,除“引用参数组”配置项选取需引用的参数组外,请在使用变量处按照${参数组名称}的格式填写参数组参数。示例详见下图。

支持使用参数组参数的位置包含:
1)数据来源:
- 数据源类型为Kafka-【topic名称】
- 任务类型为多表同步,数据源类型为MySQL、Oracle、TeleDB、SQLServer-【数据库名称】
- 任务类型为多表同步或分库分表同步,数据源类型为Oracle,增量读取方式为ogg-【Topic名称】
- 来源为Kafka、消费起始位点为时间戳
- 来源为MySQL、消费起始位点为时间戳
- 来源为MySQL、消费起始位点为指定binlog日志-binlog日志位置
- 来源为Oracle、增量读取方式为logminer、消费起始位点为时间戳
- 来源为Oracle、增量读取方式为logminer、消费起始位点为指定scn
- 来源为Oracle、增量读取方式为ogg、消费起始位点为时间戳
- 来源为SQLServer、消费起始位点为指定lsn
- 来源为TeleDB、消费起始位点为指定binlog日志-binlog日志位置
- 来源为TelePG、消费起始位点为指定lsn
2)数据去向:
- 数据源类型为Kudu-【去向表名】
- 数据源类型为Iceberg、Arctic、Hive-【去向库名】和【去向表名】
- 数据源类型为Kafka-【去向Topic】
3)字段映射:
数据去向为Kudu、Iceberg、Arctic、Hive时,来源表字段类型为自定义表达式时,支持填写参数组参数。
选择数据来源
1、Kafka Redaer
1)如果选择json,解析Topic schema时仅解析首层字段。
示例:
{"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
则字段映射中来源Topic字段为:table、op_type、op_ts、current_ts、pos、after。
2)如果选择canal json,解析Topic schema时除data字段外仅解析首层字段,data嵌套字段内则子字段作为来源Topic字段。
示例:
insert
{"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"INSERT"}
update
{"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"DELETE"}
{"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"INSERT"}
delete
{"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"DELETE"}
则字段映射中来源Topic字段为:id、name、age、address、type。
3)如果选择debezium-json,解析Topic schema时除before/after字段外仅解析首层字段,以before/after嵌套字段内的子字段作为来源Topic字段。
示例:
insert
{"before":null,"after":{"id":22,"name":"opt1","age":null},"op":"c"}
update
{"before":{"id":22,"name":"opt1","age":null},"after":null,"op":"d"}
{"before":null,"after":{"id":22,"name":"opt1123","age":null},"op":"c"}
delete
{"before":{"id":22,"name":"opt1123","age":null},"after":null,"op":"d"}
则字段映射中来源Topic字段解析为:id、name、age、op。
如勾选首层嵌套JSON平铺,会将首层中类型为row或map的嵌套字段解析为row类型并分解为单层结构,字段名称为“首层字段名称.次层字段名称”。
示例Topic:
{ "obj": { "time1": "12:12:43Z", "str": "sfasfafs", "lg": { "name": "1" }, "num": 3 }, "arr": [ { "f1": "f1str11", "f2": 134 }, { "f1": "f1str22", "f2": 555 } ], "map": { "key1": 234, "key2": 456 } }
则字段映射中来源Topic字段解析为:obj.time1、obj.str、obj.lg、num、arr、map.key1、map.key2。
如从指定时间戳开始读取,请按“yyyy-MM-dd HH:mm:ss.SSS”格式填写,其中“.SSS”可不填。
2、MySQL Redaer、Oracle Reader、PostgreSQL、TeleDB Reader、SQLServer Reader、TelePG Reader
MySQL、Oracle、PostgreSQL、TeleDB、SQLServer、TelePG的配置类似,故此处以TeleDB为例进行详细介绍。
1)多表(Topic)同步

注意:如某张来源表已被项目-集群内的其余实时同步任务采集,则表名后会展示“已采集”的标签,鼠标停留在标签时会提示已采集该表的任务名称。如仅需展示未采集表,可在穿梭框下方勾选“仅展示未采集表”。


2)分库分表同步



点击“下一步”进行数据去向配置。
选择数据去向
1、Hive、Iceberg、Arctic、MySQL、Oracle、SQLServer、StarRocks、Doris、Kudu Writer
去向为Hive、Iceberg、Arctic、MySQL、Oracle、SQLServer、StarRocks、Doris、Kudu时,配置相似,仅当去向为Hive时额外支持任务过程中创建Hive表。 故此处以Iceberg为例进行详细介绍。
数据源名称:仅当当前集群存在绑定的Iceberg数据源时,此处可选择Iceberg数据源。数据源名称格式为:项目-集群 Iceberg数据源,可选择当前项目-当前集群的Iceberg数据源和在安全中心存在库表公开给当前项目的其余项目-当前集群的Iceberg数据源。
批量设置源表和去向表映射:

注意:任务类型为多表(Topic)同步、去向为Hive和MySQL时,请勿将多张来源表写入同一去向表,否则任务运行会报错!! 1)批量选择去向库:选中的数据库将批量填充“设置来源Topic和去向表映射”处的去向库名
2)选择表名规则:来源表名和去向表名转换规则 和 去向表名规则可任选其一或组合使用。
-来源表名和去向表名转换规则 :所有的该转换规则都是针对原始表名的转换,转换完成之后的结果可以使用${source table transed},在“去向表名规则”中作为变量来使用。
注意:
1、如果不使用“去向表名规则”,则此规则将直接影响最终实际去向表名。
2、如果使用“去向表名规则”,则此规则将只影响变量${source table transed}的值,不直接影响最终实际去向表名。
-去向表名规则:
1)可以使用内置的变量命名去向表名,比如给表名加前缀“AAA”,可以使用 AAA${source table transed}。
2)可使用的内置变量如下:
1、${source table}:任务类型为多表(Topic)同步时,表示来源表名;任务类型为分库分表同步时,表示来源逻辑表名。
2、${source database}:任务类型为多表(Topic)同步时,表示来源库名;任务类型为分库分表同步时,此变量不生效。
3、${source datasource}:表示来源数据源名称。
4、${source table transed}:“来源表名和去向表名转换规则”中的转换完成之后的表名。
1)分区方式为根据数据生成时间分区或根据数据写入时间分区时,支持填写内置变量:yyyy、MM、dd、HH、mm、ss,分别表示数据生成时间/数据写入时间的年、月、日、时、分、秒,支持多个内置变量组合使用,如:yyyy-MM-dd,表示数据生成时间/数据写入时间的当日,示例:2022-01-02。
2)分区方式为根据字段内容动态分区时,支持选择来源表字段,会将源端对应字段所在数据行写入到表对应的分区中。示例:选择来源表字段为A,当A字段值为aa时,实时同步会将数据写入到对应的aa分区中,当A字段值为bb时,实时同步会将数据写入到对应的bb分区中。
3)请保证所有去向表的分区结构保持一致,否则任务可能运行失败。
注意:使用upsert要求Iceberg表同时满足两个条件:1、版本是V2,2、有主键或分区键,否则任务会报错。
2、Kafka Writer
数据源名称:仅可选择当前项目组下该数据源类型的数据源。

批量设置源表(Topic)和去向Topic映射: 1)选择Topic名称规则:来源表名和去向Topic名转换规则 和 去向Topic名称规则可任选其一或组合使用。
-来源表名和去向表名转换规则 :所有的该转换规则都是针对原始表名的转换,转换完成之后的结果可以使用${source topic transed},在“去向Topic名称规则”中作为变量来使用。
注意:
1、如果不使用“去向Topic名称规则”,则此规则将直接影响最终实际去向Topic名称。
2、如果使用“去向Topic名称规则”,则此规则将只影响变量${source topic transed}的值,不直接影响最终实际去向Topic名称。
-去向表名规则:
1)可以使用内置的变量命名去向Topic名,比如给Topic名加前缀“AAA”,可以使用 AAA${source table transed}。
2)可使用的内置变量如下:
1、${source table}:任务类型为多表(Topic)同步时,表示来源表名;任务类型为分库分表同步时,表示来源逻辑表名。
2、${source database}:任务类型为多表(Topic)同步时,表示来源库名;任务类型为分库分表同步时,此变量不生效。
3、${source datasource}:表示来源数据源名称。
4、${source topic transed}:“来源表名和去向Topic名称转换规则”中的转换完成之后的Topic名。
1)同步时会使用同步主键值作为Kafka记录的key,确保同主键的变更有序写入Kafka的同一分区。
2)如来源表有主键,建议使用来源表字段作为同步主键;如来源表无主键,建议使用其他非主键的一个或几个字段的联合,代替主键作为Kafka记录的key,选取多个字段联合作为同步主键,字段拼接时字段间以;分隔。
3)如同步主键为空,则同步时Kafka记录的key为空。如果要确保表的变更有序写入Kafka,则选择写入的Kafka Topic必须是单分区。
2)来源为Kafka、去向为Kafka时,去向的序列化格式要求与来源的序列化格式保持一致。
完成数据去向配置后,点击“下一步”进行字段映射配置。
字段映射
除来源为Kafka且去向为Kafka外,其余数据来源和去向的组合均需配置字段映射。
字段映射支持使用的内置变量如下:
(1)来源为MySQL,去向为Hive、Iceberg、Kudu、Arctic:${op}、${op_ts}
(2)来源为TeleDB,去向为Hive、Iceberg、Kudu、Arctic:${op}、${op_ts}
(3)来源为Oracle、去向为Hive、Iceberg、Kudu、Arctic:增量读取方式为logminer,支持填写${op}、${op_ts}、${scn};增量读取方式为ogg,支持填写${op}、${op_ts}
(4)来源为SQLServer,去向为Hive、Iceberg、Kudu、Arctic:${op}、${op_ts}、${change_lsn}
(5)来源为TelePG,去向为Hive、Iceberg、Kudu、Arctic:${op}、${op_ts}、${lsn}
(6)来源为Kafka且序列化格式非json、去向为Hive、Iceberg、Kudu、Arctic:${op}。此外,当序列化格式为ogg-json时,支持${pos}、${op_ts}、${current_ts}、${source table}
内置变量填写方式:来源表字段选择为自定义表达式,并按照${变量名称}格式输入内置变量,可参照下图。
在映射配置中分为全局映射配置和单个来源表(Topic)-去向表(Topic)映射配置。
同名映射:根据去向表字段名称匹配同名的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。
同行映射:根据去向表字段序号匹配序号相同的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。
在页面左侧切换Topic-表映射时,可针对选中的Topic-表映射配置字段映射。
对于单个Topic映射配置支持“获取最新表结构”、“一键解析Topic字段”、“同名映射”、“同行映射”、“设置自定义表达式字段类型”。
a.新增字段及字段顺序调整:不改变已有的字段映射列表,下拉选项中增加新增字段
b.删除字段:如果被删除的字段原已在列表中被选中,则清空来源表字段选择框的值。
c.字段类型或描述变更:更新“类型”列该字段的字段类型或字段描述 (2)数据去向
a.字段顺序调整:字段映射列表中按变更后的去向表字段顺序展示,并保留映射的来源表字段信息。
b.新增字段:字段映射列表中新增行,行序号为去向表中该字段的列序号。针对该行的来源表字段:先使用同名映射匹配是否存在同名的来源表字段,如存在同名字段则来源表字段置为同名字段,如不存在同名字段则清空来源表字段选择框的值。
c.删除字段:删除字段映射列表中的对应行。
d.字段类型变更:更新“字段类型”列该字段对应的值。
e.字段描述变更:更新“描述”列该字段的字段描述。
(3)消息通知框整体提醒
此外,针对来源或去向的表结构变更,消息通知框中会详细提示各类变更情况以及相应的字段。
关于字段类型,规则如下:
- string定义为string
- int、bigint、tinyint、smallint统一定义为bigint
- float、double、decimal统一定义为decimal
- boolean定义为boolean
- row、map统一定义为row
- array定义为array
- time、timestamp、date、byte、varbinary无法解析,统一定义为string
此外,点击复制图标可将来源Topic字段类型批量映射为去向Hive表字段类型,映射规则如下。在推断类型准确性欠佳时可考虑使用此功能。
其映射规则如下:
完成字段映射配置后,点击“下一步”进入高级配置页面。
高级设置
该页面支持参数的填写。参数名称填写格式:source/target.参数名称,source.表示数据来源端参数,target.表示数据去向端参数。示例:source.json.timestamp-format.standard。
6. 配置完成后,点击“保存”按钮即可完成任务配置。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ri-ji/5150.html