https://mp.weixin.qq.com/s/X_9h_EHnMFTrszF-mlC0vA
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图:
发行版下载地址:https://github.com/alibaba/DataX/releases
2、解压使用
tar -zxvf datax.tar.gz
3.1全量同步配置示例
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": ["*"],
"splitPk": "id",
"connection": [
{
"table": [
"sys_area_code"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/ry-cloud"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root",
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/ry-cloud-1",
"table": [
"sys_area_code"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
运行同步命令后,结果如下:
../bin/datax.py job.json
图片
3.2增量同步
第一步:编写读取源库最大id的json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/ry-cloud"
],
"querySql": [
"select max(id) from sys_area_code;"
]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter":
{
"fileName": "yuan_maxid",
"fileFormat": "csv",
"path": "/mnt/d/tools/CentOS7/soft/datax/job/",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
第二步:编写读取目标库最大id的json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/ry-cloud-1"
],
"querySql": [
"select max(id) from sys_area_code;"
]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter":
{
"fileName": "mudi_maxid",
"fileFormat": "csv",
"path": "/mnt/d/tools/CentOS7/soft/datax/job/",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
第三步:编写增量同步任务json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/ry-cloud?characterEncoding=utf8"],
"querySql": [
"select * from sys_area_code where 1=1;"
]
}
],
"password": "root",
"username": "root",
"column": ["*"]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column":["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/ry-cloud-1?characterEncoding=utf8",
"table":["sys_area_code"]
}
],
"password": "root",
"postSql": [],
"preSql": [],
"username": "root"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
3.3 编写crontab脚本
#! /bin/bash
获取原数据库最大id值,并写入一个 csv 文件
python /mnt/d/tools/CentOS7/soft/datax/bin/datax.py /mnt/d/tools/CentOS7/soft/datax/job/mysql_yuan.json
$?是shell变量,表示”最后一次执行命令”的退出状态.0为成功,非0为失败, -ne 为不等于
if [ $? -ne 0 ]; then
echo "data.sh error, can not get yuanduan_time from target db!"
exit 1
fi
获取目标数据库最大id值,并写入一个 csv 文件
python /mnt/d/tools/CentOS7/soft/datax/bin/datax.py /mnt/d/tools/CentOS7/soft/datax/job/mysql_mudi.json
$?是shell变量,表示”最后一次执行命令”的退出状态.0为成功,非0为失败, -ne 为不等于
if [ $? -ne 0 ]; then
echo "data.sh error, can not get mubiao_time from target db!"
exit 1
fi
找到 DataX 写入的文本文件,并将内容读取到一个变量中
yuan_max_id=`cat yuan_maxid__*`
mudi_max_id=`cat mudi_maxid__*`
判断原数据库id是是否大于目标数据库id,大于就运行下面脚本
if [ $yuan_max_id -ge $mudi_max_id ]; then
将sql where条件拼接
WHERE="id > '$mudi_max_id'"
将sql where条件替换成拼接的where条件,并写入文件覆盖到复制出的tmp,datax脚本
sed "s/1=1/$WHERE/g" /mnt/d/tools/CentOS7/soft/datax/job/job1.json > /mnt/d/tools/CentOS7/soft/datax/job/job1_tmp.json
最后运行覆盖后的tmp脚本,完成从mysql增量导入postgresql
python /mnt/d/tools/CentOS7/soft/datax/bin/datax.py /mnt/d/tools/CentOS7/soft/datax/job/job1_tmp.json
fi
运行上面shell脚本测试结果如下: