使用Datax将MySQL中的数据导入到TableStore中

背景

由于我们的数据在MySQL中的数据已经快接近亿级别,在访问MySQL并发读写的时候遇到了很大的瓶颈,严重的Block了我们的业务发展,主要从白天十点到晚上十点之前,并发访问的用户比较多,我们在写的前面加上了队列,系统后台自动同步。但是读上没有很好的办法解决,所以我们急需一个有较高吞吐量的实时存储系统。

本来准备自己搭建Hbase集群,但是考虑到运维代价和成本,最终放弃了这个方案。后面给阿里云发工单,了解到阿里云有一个类似于Hbase的产品,叫做TableStore,简单看了一下,总结一下优势:高并发、低延迟、按量计费、全托管。经历了一段时间的调研和使用之后,发现能满足我们业务需求,最终决定选用TableStore。

业务代码改造完成之后,需要将历史数据同步过去,使用了阿里开源的Datax插件,因此把整个迁移流程记录下来,分享给大家。

使用一键部署工具迁移数据

# 简单的5步实现数据迁移
# 下载包

# 第一步,下载一键部署包
git clone https://github.com/red-chen/one_key_install_datax.git

# 第二步,安装Datax
cd one_key_install_datax
sh datax.sh install

# 第三步,修改配置 (可以参考下面的样例进行配置,如果需要更高级的特性,请直接查看插件的帮助文档)
vim mysql_to_ots.json

# 第四步,运行
sh datax.sh run mysql_to_ots.json

# 第五步,等待执行完毕
任务启动时刻                    : 2016-06-30 00:00:00
任务结束时刻                    : 2016-06-30 16:00:00
任务总计耗时                    :              57600s
任务平均流量                    :               1.2M/s
记录写入速度                    :            1736rec/s
读出记录总数                    :           100000000
读写失败总数                    :                   0

准备插件

配置模板

  • 请自行修改{}中的内容
{
    "job": {
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",    
                    "parameter": {
                        "username": "{username}",
                        "password": "{passwd}",
                        "checkSlave":true,
                        "column": [
                            "{column_name}"
                        ],
                        "splitPk": "{pk}",
                        "connection": [
                            {
                                "table": [
                                    "{table_name}"
                                ],
                                "jdbcUrl": ["jdbc:mysql://{MySQL_HOST}:{MySQL_PORT}/{Database}"]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "otswriter",
                    "parameter": {
                        "endpoint":"{endpointnt}",
                        "accessId":"{accessId}",
                        "accessKey":"{accessKey}",
                        "instanceName":"{instanceName}",
                        "table":"{table}",
                        "primaryKey" : [
                            {"name":"{column_name}", "type":"{column_type}"}
                        ],
                        "column" : [
                            {"name":"{column_name}", "type":"{column_type}"}
                        ],
                        "writeMode" : "PutRow"
                    }
                }
            }
        ]
    }
}

样例

MySQL中的表

user_id type desc instance_count create_time
12009091 persion 李渊博 3 1467258591
12009092 company 北京天启传播有限公司 45 1460253572
  • | | | |
  • | | | |
  • | | | |
  • | | | |
  • 字段描述:

    • user_id 字符串
    • type 字符串
    • desc 字符串
    • instance_count 数值
    • create_time 数值

TableStore中的表

因为user_id是全局唯一的,所以我们只需要在TableStore创建一个PK为user_id的表即可,属性列不用创建,写入的时候直接创建

user_id

MySQL账户

  • host: tudou-user-rds.rds.cn-beiging.aliyun.com
  • port: 3163
  • user: root
  • passwd: 123456
  • db: meta
  • table: user_info

TableStore账户

样例配置

{
    "job": {
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",    
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "checkSlave":true,
                        "column": [
                            "user_id", "type", "desc", "instance_count", "create_time"
                        ],
                        "splitPk": "user_id",
                        "connection": [
                            {
                                "table": [
                                    "user_info"
                                ],
                                "jdbcUrl": ["jdbc:mysql://tudou-user-rds.rds.cn-beiging.aliyun.com:3163/meta"]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "otswriter",
                    "parameter": {
                        "endpoint":"http://tudou-user.ots.cn-beiging.aliyun.com",
                        "accessId":"testaccessid",
                        "accessKey":"testaccesskey",
                        "instanceName":"tudou-user",
                        "table":"user_info",
                        "primaryKey" : [
                            {"name":"user_id", "type":"string"}
                        ],
                        "column" : [
                            {"name":"type", "type":"string"},
                            {"name":"desc", "type":"string"},
                            {"name":"instance_count", "type":"int"},
                            {"name":"create_time", "type":"int"}
                        ],
                        "writeMode" : "PutRow"
                    }
                }
            }
        ]
    }
}

性能调优

  • 前期工作和注意点

    • 因为我们数据量比较大,所以在启动迁移之前,我们通过工单主动联系了TableStore的工程师,帮我们把表按照第一列的数据范围拆分了多个分区,加快了数据的导入速度。
    • 在测试的时候,切记不要构造大量的数据,我们在测试的时候没有太注意,测试工程师搞了1千万的数据导入到TableStore中,因为TableStore是按量计费的,导致多交了很多钱!!
  • 在迁移的数据的时候,怎么调整速度?

    • 如果觉得导入速度太慢,可以适当的加大Channel数目,Channel的意义表示启动Datax的并发任务数目
  • 我在源DB上的数据类型是string,到目标源是否能强转为Int?

    • Datax使用了标准的Java转义方式,细节可以参考Java转义
上一篇:表格存储(TableStore)新功能Stream初探


下一篇:表格存储的宽行流式读功能