首页   

增量同步和全量同步是数据库同步的两种方式。

全量同步是一次性同步全部数据,增量同步则只同步两个数据库不同的部分。

多表同步大家肯定都会想用最省事的方法,比如就建立一个公共的Json模板,将读库(reader)和写库(writer)的连接地址、端口、账号、密码、表名都动态传入,然后字段用*号代替。那博主就告诉你,后续出错和维护的坑你得走一遍了,并不是说不行,具体还是看业务场景来。

避免大家踩坑,这里的多表同步会采用脚本进行动态参数传入,建议每个表对应一个Json文件,每个字段都单独写,并且字段用 ` 符号包起来( 如果某个字段是MySQL关键字会报错 ),不要嫌麻烦,不然后面出问题了问题更大,你还得重来一遍,如果涉及几十上百张表,你会懂那个痛苦的,频繁的Ctrl+C和Ctrl+V让你怀疑人生,不多说了,都是泪 。

话不多说,直接开撸

一、全量同步

1.示例Shell脚本文件:allSyncTask.sh
#!/bin/bash
. /etc/profile
# 读库的IP
r_ip="127.0.0.1"
# 读库的端口
r_port="3306"
# 读库的数据库名称
r_dbname="datax"
# 读库的账号
r_username="root"
# 读库的密码
r_password="123456"
# 写库的IP
w_ip="127.0.0.1"
# 写库的端口
w_port="3306"
# 写库的数据库名称
w_dbname="datax2"
# 写库的账号
w_username="root"
# 写库的密码
w_password="123456"
# DataX全量同步(多个文件直接写多个执行命令)
python /opt/datax/bin/datax.py /opt/datax/job/table1.json -p "-Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
python /opt/datax/bin/datax.py /opt/datax/job/table2.json -p "-Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
2.示例Json文件(这里随便写一个,大家当做模板参考就行):table1.json

因为是多表同步,会执行多个文件,我这里的channel参数设置为10。

"job": { "setting": { "speed": { "channel": 10 "errorLimit": { "record": 0, "percentage": 0.02 "content": [ "reader": { "name": "mysqlreader", "parameter": { "column": [ "`id`", "`name`" "connection": [ "jdbcUrl": ["jdbc:mysql://${r_ip}:${r_port}/${r_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull"], "table": ["tabel1"] "username": "${r_username}", "password": "${r_password}" "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "update", "column": [ "`id`", "`name`" "session": [ "set session sql_mode='ANSI'" "connection": [ "jdbcUrl": "jdbc:mysql://${w_ip}:${w_port}/${w_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull", "table": ["tabel1"] "username": "${w_username}", "password": "${w_password}"
3.运行脚本即可

运行脚本之前先将放入的脚本进行授权,否则不能执行,对文件进行最高级别授权命令:

chmod 777 allSyncTask.sh

allSyncTask.sh放入指定文件夹,运行命令:

./allSyncTask.sh

至此全量同步完成

二、定时增量同步

相比全量同步,增量同步增加了用某个字段来判断改条数据是否被更新,具体体现在 Shell 脚本和 Json 文件中,下面会提到。

1.准备工作:

给需要同步的表增加用于增量同步判断的字段,可用ID也可以用时间,个人建议用时间,更方便。

给读库的table1表加上实时更新的时间记录字段(可根据需要是否给写库也加上,我这里是加上的/font>):

alter table table1 add column `curr_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP  COMMENT '最后更新时间(DataX数据采集使用)';

具体可参考我的其他博客的第二点:MySQL | 设置时间字段为自动更新(CURRENT_TIMESTAMP).

2.示例Shell脚本文件(相比于全量同步,这里使用了当前时间做条件传入):incrSyncTask.sh
#!/bin/bash
. /etc/profile
# 当前时间(用于增量同步判断条件)
curr_time=$(date +%Y-%m-%d)
# 读库的IP
r_ip="127.0.0.1"
# 读库的端口
r_port="3306"
# 读库的数据库名称
r_dbname="datax"
# 读库的账号
r_username="root"
# 读库的密码
r_password="123456"
# 写库的IP
w_ip="127.0.0.1"
# 写库的端口
w_port="3306"
# 写库的数据库名称
w_dbname="datax2"
# 写库的账号
w_username="root"
# 写库的密码
w_password="123456"
# DataX全量同步(多个文件直接写多个执行命令)
python /opt/datax/bin/datax.py /opt/datax/job/incr_table1.json -p "-Dcurr_time=$curr_time -Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
python /opt/datax/bin/datax.py /opt/datax/job/incr_table2.json -p "-Dcurr_time=$curr_time -Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"

至于为什么加. /etc/profile,参考此文章的第二点:DataX踩坑2: | 定时任务crontab不执行或报错:/bin/sh: java: command not found.

3.示例Json文件(相比于全量同步,这里使用了在读库(reader)加了一个where条件,条件就是传入的当前日期):incr_table1.json
"job": { "setting": { "speed": { "channel": 10 "errorLimit": { "record": 0, "percentage": 0.02 "content": [ "reader": { "name": "mysqlreader", "parameter": { "column": [ "`id`", "`name`", "`curr_time`" "where": "curr_time between DATE_SUB('${curr_time} 05:00:00', INTERVAL 1 DAY) and '${curr_time} 04:59:59'", "connection": [ "jdbcUrl": ["jdbc:mysql://${r_ip}:${r_port}/${r_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull"], "table": ["tabel1"] "username": "${r_username}", "password": "${r_password}" "writer": { "name": "mysqlwriter", "parameter": {"writeMode": "update", "column": [ "`id`", "`name`", "`curr_time`" "session": [ "set session sql_mode='ANSI'" "connection": [ "jdbcUrl": "jdbc:mysql://${w_ip}:${w_port}/${w_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull", "table": ["tabel1"] "username": "${w_username}", "password": "${w_password}"

简单说一下上面的增量逻辑:在 Shell 脚本中传入当前日期给where条件时间,我会让定时器在凌晨五点执行,所以查询的是昨天凌晨5点到今天04:59:59有修改的数据,查询条件就是第1点说到的实时修改时间

4.添加定时器,这里使用的是crontab

运行脚本之前先将放入的脚本进行授权,否则不能执行,对文件进行最高级别授权命令:

chmod 777 incrSyncTask.sh

4.1.编辑定时任务crontab,使用以下命令进入crontab任务文件:

crontab -e

4.2.进行vi编辑即可,添加crontab定时任务(每天凌晨5点执行,跟cron表达式类似):

0 5 * * * /opt/datax/bin/incrSyncTask.sh >/dev/null 2>&1

建议加上>/dev/null 2>&1,因为crontab会把日志都发给mail,这里是直接丢弃一样
,也可以指定输出到某个文件中,参考如下命令,每天执行后生成日志文件:

0 5 * * * /opt/datax/bin/incrSyncTask.sh > /opt/datax/log/incrSyncTask_$(date +\%Y\%m\%d).log 2>&1
 

这里需要用斜杠"\"对百分号进行转义

4.3.查看crontab日志:

tail -f /var/log/cron
tail -f /var/spool/mail/root

如果找不到/var/spool/mail/root文件,参考:DataX踩坑2: | 定时任务crontab不执行或报错:/bin/sh: java: command not found.

定时任务跑了之后,会将日志文件输出到上面的位置,根据登录宿主机的用户不同,文件名不同。这里的发送到邮箱有些坑,日志量过大问题、找不到邮箱问题,参考:DataX踩坑2 | 定时任务crontab不执行或报错:/bin/sh: java: command not found.

三、注意点

1.关于脚本运行后连接数据库报连接失败,重新连接等错误(\r换行符的问题)

其实脚本中的所有数据库参数都是正确的,但就是报错,可以看日志,发现每个参数后面都携带了\r换行符
在这里插入图片描述
解决方法:
Shell 脚本参数传递时有 \r 换行符问题.

2.批量执行多文件命令报错

建议将speed中的channel参数设置大一点,如10。
由于DataX读取数据和写入数据需要占用服务器性能,所以需要考虑服务器性能,channel的值并不是越大越好。
具体可参考结尾的性能优化文章。

3.执行命令报:FileNotFoundException

本来想将全量同步 Json 文件和增量同步 Json 文件放入不同的文件夹A和B,便于区分,直接放在 DataX 安装路径的 job 目录中,结果执行命令时找不到 job 目录下的A、B文件夹中的Json文件

4.定时任务crontab不执行 或 报错:/bin/sh: java: command not found

参考文章:DataX踩坑2: | 定时任务crontab不执行或报错:/bin/sh: java: command not found.

已优化重构并迁移至datax-admin common-datax 基于阿里DataX开发一个通用导数的微服务,可以开发前台页面,根据reader和writer自动进行数据同步 本项目只限于同步数据源量很少的时候使用,若是数据源很多的情况,请参考下面的设计思路 由于阿里DataX有一些缺点: 不够自动化 需要手写json 需要手动运行job 搬砖的时间很宝贵,所以: 提供通用数据抽取restful接口 HDFS自动创库创表创分区 利用freemarker模板自动创建json文件 自动python执行job 集成Azkaban进行调度管理 例如:mysql到hive 选择mysql需要同步的表、字段等信息,输入导入到hive的库表分区等信息,不需提前在hive进行创库创表创分区,自动根据要导的mysql表以及字段类型进行创建hive库表分区,然后利用 echo '文章同步 MySQL To MySQL'DATAX组件(t_article) 用到2个插件mysqlreader^[1]、mysqlwriter^[2] 选 自定义模板:
开源 ETL 工具 DataX 实践,从mysql 到不同结构的另一个mysql全量同步和批量更新 链接: datax官方项目地址 查看全量同步 查看批量更新 实践步骤: 参照官方文档,采用方法一部署 如果点击下载没反应,手动复制地址,把http换成https 下载解压完成,运行自检脚本 File “datax.py”, line 114 print readerRef 。因为我电脑安装的是python3 ,脚本里是python2语法 修改下 datax.p
增量同步全量同步是数据库同步的两种方式。全量同步是一次性同步全部数据,增量同步则只同步两个数据库不同的部分。由于遇到的项目是从A库每天将增量数据同步到B库。所以考虑采用多表定时增量同步。 一. 确定增量同步的判断字段,可用ID也可以用时间。 由于需求是按天将前一天的数据从A到B,所以将A库中的字段统计日期(tjrq) 作为判断条件。 tjrq是yyyy-mm-dd格式的日期 二. 编写shell脚本,配置读库(reader)和写库(writer)的连接地址、端口、账号、密码等。示例:etl.sh
数据中台的定义 数据中台是一套可持续“让企业的数据用起来”的机制,一种战略选择和组织形式,是依据企业特有的业务模式和组织架构,通过有形的产品和实施方法论支撑,构建一套持续不断把数据变成资产并服务于业务的机制。 数据中台的使命就是持续让数据用起来,它的一个根本性创新就是把“数据资产”作为一个基础要素独立出来,让成为资产的数据作为生产资料融入业务价值创造过程,持续产生价值,业务产生数据,数据服务业务,业务在阳,数据在阴,阴阳互补,形成闭环。 数据中台的核心能力 数据中台帮助企业实现数据的汇聚整合,.
https://blog.csdn.net/aWDac/article/details/80822233 [推荐]DataX实战应用 https://blog.csdn.net/u010429286/article/details/82356121 DataX是由Alibaba开源的一款异构数据同步工具,可以在常见的各种数据源...
1、如何调整jvm的参数? 调整datax.py文件中DEFAULT_JVM的值即可 2、插件对应的参数具体含义以及报错之后该如何解决,以MysqlReader为例 "reader": { "name": "mysqlreader", "parameter": { python ${DATAX_HOME}\bin\datax.py -r 读插件类型-w 写插件类型 举例: mysql导入到orcale数据库 python ${DATAX_HOME}\bin\datax.py -r mysqlreader -w oraclewriter CREATE TABLE `xx_datax_status` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id', `dbname` varchar(64) NOT NULL COMMENT '数据库名', `tbname` varchar(64) NOT NULL COMMENT '表名', `xx_xx_data_create_time` datetime DEFAULT NULL COMMENT '业务层表中数据创建
© 2022 微搜