基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

前言 

前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如 DTS。下面我们将对各种同步工具进行介绍,并以 DTS 为例展示同步操作。

同步工具介绍

DataX

DataX 是异构数据源离线同步的工具,支持多种异构数据源之间高效的数据同步。它使用 SQL 从数据库拉取数据,全内存操作。它具有一下特点:

  • 适合进行离线全量同步,不适合支持增量同步。
  • 可以通过编程来进行增量同步,但有一定延时,源表需要通过字段区分哪些记录为待同步字段,且无法捕获删除操作。
  • 单点执行。

因此,它适合中小用户,同步对实时性无太高要求的数据。其具体使用见:数据同步-从MySQL到Tablestore

Canal

Canal是阿里开源 CDC 工具,他可以获取 MySQL binlog 并解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库向其他数据库的同步工作。Tablestore 团队已经在 Canal 中实现了 Tablestore 适配器,可以支持将 MySQL 数据同步进入 Tablestore,具体细节请参考后续文章。

DTS

数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅及实时同步功能于一体,能够解决公共云、混合云场景下,远距离、毫秒级异步数据传输难题。其特点为:

  • 基于云部署,只需要简单配置就可以运行
  • 基于 binlog,可以实时同步数据
  • 费用相对于 DataX 高

因此,目前,中小型用户,对实时性要求没有很高的用户,可以使用 DataX 进行 MySQL 到 Tablestore 的同步。而企业级用户,或者对于延迟要求比较高的客户,推荐使用 DTS 进行数据同步。 本文会展示如何完成基于 DTS 从 MySQL 到 Tablestore 的同步系统的搭建。而这套同步系统正是订单数据上Tablestore 的第一步工作。

服务开通

创建 MySQL 并建表

在 RDS 上申请源的 MySQL 数据库,可以参考创建RDS MySQL实例。已经在 RDS 上或者 ECS 上拥有 MySQL 实例的同学可以忽略这一步。

在数据库中创建订单表 order_contract,建表语句如下:

CREATE TABLE `order_contract` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;

开通 Tablestore 服务

提供了三种方式创建 Tablestore 实例,读者请*选择一种创建。

阿里云 CLI (POP Client) 创建按量模式实例

通过阿里云官网下载安装并配置阿里云 CLI。具体参考 阿里云 CLI。完毕后,打开命令行,输入

aliyun ots InsertInstance --endpoint ots.cn-hangzhou.aliyuncs.com  --InstanceName test-20210609

用于创建实例。其中 endpoint 填入实例所在地域域名。然后在 Tabelstore 控制台即可看到创建出的实例如图。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

在 Cloud Shell 中可以同样使用此指令建立 Tablestore 实例,Cloud Shell 地址见 Cloud Shell 地址

控制台创建按量模式实例

进入Tablestore首页。点击进入管理控制台

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

点击创建实例

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

选择按量模式。填入实例名称

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

点击确定,完成创建。然后可以在控制台首页看到对应实例。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

控制台创建预留模式实例

此创建过程与控制台创建按量模式实例基本相同,只是在选择购买方式时选择预留模式。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

开通配置 DTS

进入DTS首页。点击立即购买

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

商品类型选择数据传输服务 Tablestore(后付费),功能选择数据同步,源实例选择 MySQL ,目标实例选择 Tablestore ,同步拓扑选择单向同步

若有压测等需要大量同步记录的需求,同步链路规格参数可以适当选大,每种链路的传输数据能力可以在页面上查看。

点击立即购买。弹出如下页面,勾选服务协议,然后点击立即开通,即完成购买。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

购买成功后,进入管理控制台,点击同步实例下面的数字“1”,可以看见尚未配置的实例信息。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

点击配置同步链路

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

配置源库 MySQL 和目标库 Tablestore 的信息。AccessKey 配置参考:获取AccessKey。配置完成后点击授权白名单并进入下一步

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

选择同步对象页,同步初始化这里,结构初始化全量数据初始化增量数据初始化这三个选项都要进行勾选。源库对象,选择源表 order_contract,点击中间红框中的按钮,将表传送到右侧“已选中对象”。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

点击编辑

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

将 pay_time 的数据类型映射为 Integer。这样在 Tablestore 中的 pay_time字段是以微秒为单位的时间戳。点击确定。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇 

配置完成后,向下滑动窗口,点击“下一步”进入高级配置。继续点击预检查并启动,进行启动。在控制台的数据同步页,可以看到刚刚配置的同步任务。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

数据同步测试

字段说明

通过前面,我们申请好了充当订单库的 MySQL 数据库,Tablestore 实例,并且搭建了 DTS 任务从 MySQL 向Tablestore 同步数据。下面,我们简要写一个 Java 程序,持续向 MySQL 中写入订单数据,以验证 DTS 的持续同步能力。生成的订单记录,各字段加入一些随机性,以构造更加真实的测试数据。各字段生成逻辑见表。

字段

字段含义

取值说明

oId

订单号

使用当前时间戳 + c_id,例如1623228187366_user2

create_time

下单时间

取当前时间

pay_time

支付时间

取当前时间,不做更细化仿真,假设每笔订单下单同时支付。

has_paid

是否已经支付

设定为true,这里不对此字段进行仿真。

c_id

消费者id

取一千万以内的随机整数,假设有一千万消费者。消费者id为“user” + id格式,比如“user1”

c_name

消费者姓名

使用“客户” + 消费者id的格式,比如1号消费者,姓名为“客户1”

p_brand

产品品牌

格式为“品牌id”,id为5000以内随机整数,仿真5000个品牌

p_count

产品数量

1到10取随机整数

p_id

产品id

格式为"store1_id", id为100以内随机数,假设每个店铺有100个产品,例如store1_1

p_name

产品名

格式为“产品” + p_id,比如“产品store4075_25”

p_price

产品价格

0到1000元随机浮点数

s_id

店铺id

5000以内随机整数,假设有5000家店铺。店铺id为“store” + id 格式,比如“store1”

s_name

店铺名称

使用“旗舰店” + id的格式,比如1号店铺,id为“store1”,店铺名称为“旗舰店1”

total_price

总价格

p_count * p_price

程序说明

搭建 Springboot 项目,其中创建订单代码如下,代码中包含随机生成参数的逻辑。

    // 创建订单
    private OrderContract createOrder() {
        long now = System.currentTimeMillis();
        LocalDateTime nowT = LocalDateTime.now();
        int cNumber = r.nextInt(1000 * 10000); // 一千万用户
        String userId = "user" + cNumber;
        String oId = now + "_" + userId;

        OrderContract item = new OrderContract();
        item.setoId(oId);
        item.setCreateTime(nowT);
        item.setPayTime(nowT);
        item.setHasPaid(true);
        item.setcId(userId);
        item.setcName("客户" + cNumber);

        int count = r.nextInt(10) + 1;
        item.setpCount(count);   // 商品数量

        double price = r.nextDouble() * 1000d;   // 单价1到1000
        item.setpPrice(price);

        int storeId = r.nextInt(5000); //5000个店铺
        item.setsId("store" + storeId);
        item.setsName("旗舰店" + storeId);
        item.setTotalPrice(item.getpPrice() * item.getpCount());

        int brandId = r.nextInt(5000);
        item.setpBrand("品牌" + brandId);

        int productId = r.nextInt(100);
        item.setpId(item.getsId() +"_" + productId);
        item.setpName("产品" + item.getpId());

        return item;
    }

批量获得订单并插入数据库代码如下,根据传入参数,插入数据库。

public void insertIntoOrders(int size) {
        System.out.println("start insert orders");
        List<OrderContract> list = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            OrderContract order = createOrder();
            list.add(order);
        }

        userMapper.batchInsert(list);
        System.out.println("finish insert orders.");
    }

插入数据库代码:

// Mybatis配置sql
<insert id="batchInsert" parameterType="List">
    insert into order_contract(oId,create_time,pay_time,has_paid,c_id,c_name,p_brand,
    p_count,p_id,p_name,p_price,s_id,s_name,total_price)
    values
    <foreach collection="list" index="index" item="item" separator=",">
        (#{item.oId},#{item.createTime},#{item.payTime},#{item.hasPaid},#{item.cId},#{item.cName},#{item.pBrand},
        #{item.pCount},#{item.pId},#{item.pName},#{item.pPrice},#{item.sId},#{item.sName},#{item.totalPrice})
    </foreach>
</insert>

循环执行批量插入数据库逻辑,以达到批量生成订单数据的目的。

   public void initOrders() {
        while (true) {
            try {
                int size = r.nextInt(1000);
                insertIntoOrders(size);

                Thread.sleep(4000L);
            } catch (InterruptedException e) {
               break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

同步结果

启动 Java 程序,可以在 Tablestore 中观察到新的订单记录持续的从 MySQL 库同步到 Tablestore 中。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

手动导入 SQL

用户也可以手动执行 SQL 插入测试数据。使用下面的存储过程可以直接通过 SQL 写入测试数据。因为逐条插入,插入性能要比程序里的批量插入慢。

DROP PROCEDURE if EXISTS test;
DELIMITER //
CREATE procedure test()
BEGIN
DECLARE i INT;
DECLARE userId INT;
DECLARE c INT;
DECLARE price DOUBLE;
DECLARE storeId INT;
DECLARE brandId INT;
DECLARE productId INT;
DECLARE c_id VARCHAR(255);
SET i = 0;
WHILE i<1000 DO    // 这里的值决定写入记录数
SET userId=CEILING(RAND()*1000*10000);
SET c=CEILING(RAND()*10);
SET price=RAND()*1000;
SET storeId=CEILING(RAND()*5000);
SET brandId=CEILING(RAND()*5000);
SET productId=CEILING(RAND()*100);
SET c_id=CONCAT("user",userId);

INSERT INTO test(oId,create_time,pay_time,
has_paid,c_id,c_name,
p_brand,p_count,p_id,
p_name,p_price,s_id,
s_name,total_price) VALUES
(CONCAT(unix_timestamp(now()),"_",c_id), now(), now(),
true,c_id,CONCAT("客户",userId),
CONCAT("品牌",brandId),c,CONCAT("store",storeId,"_",productId),
CONCAT("产品store",storeId,"_",productId),price,CONCAT("store",storeId),
CONCAT("旗舰店",storeId),p_price * c
);
SET i = i+1;
END WHILE;
END
//
DELIMITER ;
CALL test();

总结

基于 DTS,我们可以实现 MySQL 数据向 Tablestore 的实时同步。数据进入 Tablestore 后,我们可以利用 Tablestore 的特性进行搜索、分析等操作。我们会在后续文章中进行说明。

附录

代码 git 地址:https://github.com/aliyun/tablestore-examples

上一篇:基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇


下一篇:Vue中父组件与子组件之间的通信