tencent cloud

文档反馈

MySQL 数据实时或者批量写入

最后更新时间:2024-06-27 11:04:41
    MySQL 数据实时写入主要用于导入实时增量数据。批量写入主要用于导入表中存量数据。下面分别说明导入方法。

    MySQL 数据实时写入

    实时写入 Mysql 数据主要采用 Binlog Load 机制实现。Binlog Load 提供了一种使 Doris 增量同步用户对 Mysql 数据库的数据更新操作的 CDC(Change Data Capture)功能。

    适用场景

    INSERT/UPDATE/DELETE 支持
    过滤 Query
    暂不兼容 DDL 语句

    基本原理

    当前版本设计中,Binlog Load 需要依赖 canal 作为中间媒介,让 canal 伪造成一个从节点去获取 Mysql 主节点上的 Binlog 并解析,再由 Doris 去获取 Canal 上解析好的数据,主要涉及 Mysql 端、Canal 端以及 Doris 端,总体数据流向如下:
    +---------------------------------------------+
    | Mysql |
    +----------------------+----------------------+
    | Binlog
    +----------------------v----------------------+
    | Canal Server |
    +-------------------+-----^-------------------+
    Get | | Ack
    +-------------------|-----|-------------------+
    | FE | | |
    | +-----------------|-----|----------------+ |
    | | Sync Job | | | |
    | | +------------v-----+-----------+ | |
    | | | Canal Client | | |
    | | | +-----------------------+ | | |
    | | | | Receiver | | | |
    | | | +-----------------------+ | | |
    | | | +-----------------------+ | | |
    | | | | Consumer | | | |
    | | | +-----------------------+ | | |
    | | +------------------------------+ | |
    | +----+---------------+--------------+----+ |
    | | | | |
    | +----v-----+ +-----v----+ +-----v----+ |
    | | Channel1 | | Channel2 | | Channel3 | |
    | | [Table1] | | [Table2] | | [Table3] | |
    | +----+-----+ +-----+----+ +-----+----+ |
    | | | | |
    | +--|-------+ +---|------+ +---|------+|
    | +---v------+| +----v-----+| +----v-----+||
    | +----------+|+ +----------+|+ +----------+|+|
    | | Task |+ | Task |+ | Task |+ |
    | +----------+ +----------+ +----------+ |
    +----------------------+----------------------+
    | | |
    +----v-----------------v------------------v---+
    | Coordinator |
    | BE |
    +----+-----------------+------------------+---+
    | | |
    +----v---+ +---v----+ +----v---+
    | BE | | BE | | BE |
    +--------+ +--------+ +--------+
    如上,用户向 FE 提交一个数据同步作业。
    FE 会为每个数据同步作业启动一个 canal client,来向 canal server 端订阅并获取数据。
    client 中的 receive r将负责通过 Get 命令接收数据,每获取到一个数据 batch,都会由 consumer 根据对应表分发到不同的 channel,每个 channel 都会为此数据 batch 产生一个发送数据的子任务 Task。
    在 FE 上,一个 Task 是 channel 向 BE 发送数据的子任务,里面包含分发到当前 channel 的同一个 batch 的数据。
    channel 控制着单个表事务的开始、提交、终止。一个事务周期内,一般会从 consumer 获取到多个 batch 的数据,因此会产生多个向BE发送数据的子任务 Task,在提交事务成功前,这些 Task 不会实际生效。
    满足一定条件时(例如超过一定时间、达到提交最大数据大小),consumer 将会阻塞并通知各个 channel 提交事务。
    当且仅当所有 channel 都提交成功,才会通过 Ack 命令通知 canal 并继续获取、消费数据。
    如果有任意 channel 提交失败,将会重新从上一次消费成功的位置获取数据并再次提交(已提交成功的 channel 不会再次提交以保证幂等性)。
    整个数据同步作业中,FE 通过以上流程不断的从 canal 获取数据并提交到 BE,来完成数据同步。

    配置 Mysql 端

    在 Mysql Cluster 模式的主从同步中,二进制日志文件(Binlog)记录了主节点上的所有数据变化,数据在 Cluster 的多个节点间同步、备份都要通过 Binlog 日志进行,从而提高集群的可用性。架构通常由一个主节点(负责写)和一个或多个从节点(负责读)构成,所有在主节点上发生的数据变更将会复制给从节点。
    注意
    目前必须要使用 Mysql 5.7及以上的版本才能支持 Binlog Load 功能。
    要打开 mysql 的二进制 binlog 日志功能,则需要编辑 my.cnf 配置文件设置一下。
    [mysqld]
    log-bin = mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式

    Mysql 端说明

    在 Mysql 上,Binlog 命名格式为 mysql-bin.000001、mysql-bin.000002... ,满足一定条件时 mysql 会去自动切分 Binlog 日志:
    mysql 重启了。
    客户端输入命令 flush logs。
    binlog 文件大小超过1G。
    要定位 Binlog 的最新的消费位置,可以通过 binlog 文件名和 position(偏移量)。例如,各个从节点上会保存当前消费到的 binlog 位置,方便随时断开连接、重新连接和继续消费。
    --------------------- ------------------
    | Slave | read | Master |
    | FileName/Position | <<<--------------------------- | Binlog Files |
    --------------------- ------------------
    对于主节点来说,它只负责写入 Binlog,多个从节点可以同时连接到一台主节点上,消费 Binlog 日志的不同部分,互相之间不会影响。 Binlog 日志支持两种主要格式(此外还有混合模式 mixed-based):
    statement-based格式: Binlog只保存主节点上执行的sql语句,从节点将其复制到本地重新执行
    row-based格式: Binlog会记录主节点的每一行所有列的数据的变更信息,从节点会复制并执行每一行的变更到本地
    第一种格式只写入了执行的 sql 语句,虽然日志量会很少,但是有下列缺点:
    1. 没有保存每一行实际的数据。
    2. 在主节点上执行的 UDF、随机、时间函数会在从节点上结果不一致。
    3. limit 语句执行顺序可能不一致。
    因此我们需要选择第二种格式,才能从 Binlog 日志中解析出每一行数据。 在 row-based 格式下,Binlog 会记录每一条 binlog event 的时间戳、server id、偏移量等信息,如下面一条带有两条 insert 语句的事务:
    begin;
    insert into canal_test.test_tbl values (3, 300);
    insert into canal_test.test_tbl values (4, 400);
    commit;
    对应将会有四条 binlog event,其中一条 begin event,两条 insert event,一条 commit event:
    SET TIMESTAMP=1538238301/*!*/;
    BEGIN
    /*!*/.
    # at 211935643
    # at 211935698
    #180930 0:25:01 server id 1 end_log_pos 211935698 Table_map: 'canal_test'.'test_tbl' mapped to number 25
    #180930 0:25:01 server id 1 end_log_pos 211935744 Write_rows: table-id 25 flags: STMT_END_F
    ...
    '/*!*/;
    ### INSERT INTO canal_test.test_tbl
    ### SET
    ### @1=1
    ### @2=100
    # at 211935744
    #180930 0:25:01 server id 1 end_log_pos 211935771 Xid = 2681726641
    ...
    '/*!*/;
    ### INSERT INTO canal_test.test_tbl
    ### SET
    ### @1=2
    ### @2=200
    # at 211935771
    #180930 0:25:01 server id 1 end_log_pos 211939510 Xid = 2681726641
    COMMIT/*!*/;
    如图所示,每条 Insert event 中包含了修改的数据。在进行 Delete/Update 操作时,一条 event 还能包含多行数据,使得 Binlog 日志更加的紧密。

    开启 GTID 模式(可选)

    一个全局事务 Id (global transaction identifier)标识出了一个曾在主节点上提交过的事务,在全局都是唯一有效的。开启了 Binlog 后,GTID 会被写入到 Binlog 文件中,与事务一一对应。 要打开 mysql 的 GTID 模式,则需要编辑 my.cnf 配置文件设置一下:
    gtid-mode=on // 开启gtid模式
    enforce-gtid-consistency=1 // 强制gtid和事务的一致性
    在 GTID 模式下,主服务器可以不需要 Binlog 的文件名和偏移量,就能很方便的索引事务、恢复数据、复制副本。 在 GTID 模式下,由于 GTID 的全局有效性,从节点将不再需要通过保存文件名和偏移量来定位主节点上的 Binlog 位置,而通过数据本身就可以定位了。在进行数据同步中,从节点会跳过执行任意被识别为已执行的 GTID 事务。 GTID 的表现形式为一对坐标, source_id标识出主节点,transaction_id表示此事务在主节点上执行的顺序(最大263-1)。
    GTID = source_id:transaction_id
    例如,在同一主节点上执行的第23个事务的 gtid 为:
    3E11FA47-71CA-11E1-9E33-C80AA9429562:23

    配置 Canal 端

    canal 是属于阿里巴巴 otter 项目下的一个子项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,用于解决跨机房同步的业务场景,建议使用 canal 1.1.5及以上版本,点击 下载地址,下载完成后,请按以下步骤完成部署。
    1. 解压 canal deployer。
    2. 在 conf 文件夹下新建目录并重命名,作为 instance 的根目录,目录名即后文提到的 destination。
    3. 修改 instance 配置文件(可拷贝conf/example/instance.properties):
    vim conf/{your destination}/instance.properties
    ## canal instance serverId
    canal.instance.mysql.slaveId = 1234
    ## mysql adress
    canal.instance.master.address = 127.0.0.1:3306
    ## mysql username/password
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    4. 启动
    sh bin/startup.sh
    5. 验证启动成功
    cat logs/{your destination}/{your destination}.log
    2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]
    2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx
    2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

    Canal 端说明

    canal 通过伪造自己的 mysql dump 协议,去伪装成一个从节点,获取主节点的 Binlog 日志并解析。 canal server 上可启动多个 instance,一个 instance 可看作一个从节点,每个 instance 由下面几个部分组成:
    -------------------------------------------------
    | Server |
    | -------------------------------------------- |
    | | Instance 1 | |
    | | ----------- ----------- ----------- | |
    | | | Parser | | Sink | | Store | | |
    | | ----------- ----------- ----------- | |
    | | ----------------------------------- | |
    | | | MetaManager | | |
    | | ----------------------------------- | |
    | -------------------------------------------- |
    -------------------------------------------------
    parser:数据源接入,模拟 slave 协议和 master 进行交互,协议解析。
    sink:parser 和 store 链接器,进行数据过滤,加工,分发的工作。
    store:数据存储。
    meta manager:元数据管理模块。
    每个 instance 都有自己在 cluster 内的唯一标识,即 server Id。
    在 canal server 内,instance 用字符串表示,此唯一字符串被记为 destination,canal client 需要通过 destination 连接到对应的 instance。
    注意
    canal client 和 canal instance 是一一对应的,Binlog Load 已限制多个数据同步作业不能连接到同一个 destination。
    数据在 instance 内的流向是 binlog -> parser -> sink -> store
    instance 通过 parser 模块解析 binlog 日志,解析出来的数据缓存在 store 里面,当用户向FE提交一个数据同步作业时,会启动一个 canal client 订阅并获取对应 instance 中的 store 内的数据。 store 实际上是一个环形的队列,用户可以自行配置它的长度和存储空间。
    
    store通过三个指针去管理队列内的数据:
    1. get指针:get指针代表客户端最后获取到的位置。
    2. ack指针:ack指针记录着最后消费成功的位置。
    3. put指针:代表sink模块最后写入store成功的位置。
    canal client异步获取store中数据
    
    get 0 get 1 get 2 put
    | | | ...... |
    v v v v
    --------------------------------------------------------------------- store环形队列
    ^ ^
    | |
    ack 0 ack 1
    canal client 调用 get 命令时,canal server 会产生数据 batch 发送给 client,并右移 get 指针,client 可以获取多个 batch,直到 get 指针赶上 put 指针为止。
    当消费数据成功时,client 会返回 ck + batch Id 通知已消费成功了,并右移 ack 指针,store 会从队列中删除此 batch 的数据,腾出空间来从上游 sink 模块获取数据,并右移 put 指针。
    当数据消费失败时,client 会返回 rollback 通知消费失败,store 会将 get 指针重置左移到ack指针位置,使下一次 client 获取的数据能再次从 ack 指针处开始。
    和 Mysql 中的从节点一样,canal 也需要去保存 client 最新消费到的位置。canal 中所有元数据(如 GTID、Binlog 位置)都是由 MetaManager 去管理的,目前元数据默认以 json 格式持久化在 instance 根目录下的 meta.dat 文件内。

    基本操作

    配置目标表属性

    用户需要先在 Doris 端创建好与 Mysql 端对应的目标表。 Binlog Load 只能支持 Unique 类型的目标表,且必须激活目标表的 Batch Delete 功能。 开启 Batch Delete 的方法可以参考 ALTER TABLE PROPERTY 中的批量删除功能。 示例:
    --create Mysql table
    CREATE TABLE `source_test` (
    `id` int(11) NOT NULL COMMENT "",
    `name` int(11) NOT NULL COMMENT ""
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    
    -- create Doris table
    CREATE TABLE `target_test` (
    `id` int(11) NOT NULL COMMENT "",
    `name` int(11) NOT NULL COMMENT ""
    ) ENGINE=OLAP
    UNIQUE KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`) BUCKETS 8;
    
    -- enable batch delete
    ALTER TABLE target_test ENABLE FEATURE "BATCH_DELETE";
    注意
    Doris 表结构和 Mysql 表结构字段顺序必须保持一致。

    创建同步作业

    CREATE SYNC `demo`.`job`
    (
    FROM `source_test` INTO `target_test`
    (id,name)
    )
    FROM BINLOG
    (
    "type" = "canal",
    "canal.server.ip" = "127.0.0.1",
    "canal.server.port" = "11111",
    "canal.destination" = "xxx",
    "canal.username" = "canal",
    "canal.password" = "canal"
    );
    创建数据同步作业的详细语法可以连接到 Doris 后,HELP CREATE SYNC JOB 查看语法帮助。这里主要详细介绍,创建作业时的注意事项。 语法:
    CREATE SYNC [db.]job_name
    (
    channel_desc,
    channel_desc
    ...
    )
    binlog_desc
    job_name job_name是数据同步作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。
    channel_desc channel_desc用来定义任务下的数据通道,可表示mysql源表到doris目标表的映射关系。在设置此项时,如果存在多个映射关系,必须满足 mysql 源表应该与 doris 目标表是一一对应关系,其他的任何映射关系(如一对多关系),检查语法时都被视为不合法。
    column_mapping column_mapping主要指 mysql 源表和 doris 目标表的列之间的映射关系,如果不指定,FE会默认源表和目标表的列按顺序一一对应。但是我们依然建议显式的指定列的映射关系,这样当目标表的结构发生变化(例如增加一个 nullable 的列),数据同步作业依然可以进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。
    binlog_desc binlog_desc中的属性定义了对接远端 Binlog 地址的一些必要信息,目前可支持的对接类型只有 canal 方式,所有的配置项前都需要加上 canal 前缀。
    1.1 canal.server.ip:canal server 的地址。
    1.2 canal.server.port:canal server 的端口。
    1.3 canal.destination:前文提到的 instance 的字符串标识。
    1.4 canal.batchSize:每批从 canal server 处获取的 batch 大小的最大值,默认8192。
    1.5 canal.username:instance 的用户名。
    1.6 canal.password:instance 的密码。
    1.7 canal.debug:设置为 true 时,会将 batch 和每一行数据的详细信息都打印出来,会影响性能。

    查看作业状态

    查看作业状态的具体命令和示例可以通过 SHOW SYNC JOB 命令查看。 返回结果集的参数意义如下:
    State 作业当前所处的阶段。作业状态之间的转换如下图所示:
    +-------------+
    create job | PENDING | resume job
    +-----------+ <-------------+
    | +-------------+ |
    +----v-------+ +-------+----+
    | RUNNING | pause job | PAUSED |
    | +-----------------------> |
    +----+-------+ run error +-------+----+
    | +-------------+ |
    | | CANCELLED | |
    +-----------> <-------------+
    stop job +-------------+ stop job
    system error
    作业提交之后状态为 PENDING,由 FE 调度执行启动 canal client 后状态变成 RUNNING,用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和恢复,操作后作业状态分别为 CANCELLED/PAUSED/RUNNING。 作业的最终阶段只有一个 CANCELLED,当作业状态变为 CANCELLED 后,将无法再次恢复。当作业发生了错误时,若错误是不可恢复的,状态会变成 CANCELLED,否则会变成 PAUSED。
    Channel 作业所有源表到目标表的映射关系。
    Status 当前 binlog 的消费位置(若设置了 GTID 模式,会显示 GTID),以及 doris 端执行时间相比 mysql 端的延迟时间。
    JobConfig 对接的远端服务器信息,如 canal server 的地址与连接 instance 的 destination。

    控制作业

    用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和恢复。可以通过 STOP SYNC JOBPAUSE SYNC JOB 以及 RESUME SYNC JOB

    相关参数

    CANAL 配置

    下面配置属于 canal 端的配置,主要通过修改 conf 目录下的 canal.properties 调整配置值。
    canal.ip canal server 的 IP 地址。
    canal.port canal server 的端口。
    canal.instance.memory.buffer.size canal 端的 store 环形队列的队列长度,必须设为2的幂次方,默认长度16384。此值等于 canal 端能缓存 event 数量的最大值,也直接决定了 Doris 端一个事务内所能容纳的最大 event 数量。建议将它改的足够大,防止 Doris 端一个事务内能容纳的数据量上限太小,导致提交事务太过频繁造成数据的版本堆积。
    canal.instance.memory.buffer.memunit canal 端默认一个 event 所占的空间,默认空间为1024 bytes。此值乘以 store 环形队列的队列长度等于 store 的空间最大值,例如 store 队列长度为16384,则 store 的空间为16MB。但是,一个 event 的实际大小并不等于此值,而是由这个 event 内有多少行数据和每行数据的长度决定的,例如一张只有两列的表的 insert event 只有30字节,但 delete event 可能达到数千字节,这是因为通常 delete event 的行数比 insert event 多。

    FE 配置

    下面配置属于数据同步作业的系统级别配置,主要通过修改 fe.conf 来调整配置值。
    sync_commit_interval_second 提交事务的最大时间间隔。若超过了这个时间 channel 中还有数据没有提交,consumer 会通知 channel 提交事务。
    min_sync_commit_size
    提交事务需满足的最小event数量。若Fe接收到的event数量小于它,会继续等待下一批数据直到时间超过了`sync_commit_interval_second `为止。默认值是10000个events,如果您想修改此配置,请确保此值小于canal端的`canal.instance.memory.buffer.size`配置(默认16384),否则在ack前Fe会尝试获取比store队列长度更多的event,导致store队列阻塞至超时为止。
    min_bytes_sync_commit 提交事务需满足的最小数据大小。若 FE 接收到的数据大小小于它,会继续等待下一批数据直到时间超过了sync_commit_interval_second为止。默认值是15MB,如果您想修改此配置,请确保此值小于 canal 端的canal.instance.memory.buffer.sizecanal.instance.memory.buffer.memunit的乘积(默认16MB),否则在 ack 前 Fe 会尝试获取比 store 空间更大的数据,导致 store 队列阻塞至超时为止。
    max_bytes_sync_commit 提交事务时的数据大小的最大值。若 Fe 接收到的数据大小大于它,会立即提交事务并发送已积累的数据。默认值是64MB,如果您想修改此配置,请确保此值大于 canal 端的 canal.instance.memory.buffer.sizecanal.instance.memory.buffer.memunit 的乘积(默认16MB)和 min_bytes_sync_commit
    max_sync_task_threads_num 数据同步作业线程池中的最大线程数量。此线程池整个FE中只有一个,用于处理 FE 中所有数据同步作业向 BE 发送数据的任务 task,线程池的实现在 SyncTaskPool 类。

    常见问题

    1. 修改表结构是否会影响数据同步作业? 会影响。数据同步作业并不能禁止alter table的操作,当表结构发生了变化,如果列的映射无法匹配,可能导致作业发生错误暂停,建议通过在数据同步作业中显式指定列映射关系,或者通过增加 Nullable 列或带 Default 值的列来减少这类问题。
    2. 删除了数据库后数据同步作业还会继续运行吗? 不会。删除数据库后的几秒日志中可能会出现找不到元数据的错误,之后该数据同步作业会被FE的定时调度检查时停止。
    3. 多个数据同步作业可以配置相同的ip:port + destination吗? 不能。创建数据同步作业时会检查 ip:port + destination 与已存在的作业是否重复,防止出现多个作业连接到同一个 instance 的情况。
    4. 为什么数据同步时浮点类型的数据精度在 Mysql 端和 Doris 端不一样? Doris 本身浮点类型的精度与 Mysql 不一样。可以选择用 Decimal 类型代替。

    MySQL 数据批量写入

    批量写入 Mysql 数据可选使用之前介绍过的几种导入方式:
    1. 使用 JDBC 同步数据:通过一个 JDBC 连接获取 mysql 中的数据,另一个 JDBC 连接 Doris 批量插入从 mysql 获取的数据。
    2. 通过外部表同步数据。
    3. mysql 表数据导出为 csv / json 格式的本地文件后使用 Stream load 导入。
    4. mysql 表数据导出为 csv / json 格式的本地文件并上传至 HDFS / S3 存储系统后使用 Broker load 导入。
    5. mysql 表数据导出为 csv / json 格式的本地文件并上传至 HDFS 存储系统后使用 Spark load 导入。

    注意事项

    上述每种导入方式都有各自的注意事项。
    1. JDBC 同步方式注意一条 sql 中插入的数据的条数;另外如果插入条数过多则不适合用于线上环境。
    2. ODBC 外部表不合适一次性导入大量的数据,建议分批多次导入。
    3. Stream load 建议的一次导入数据量在 1G 到 10G 之间,如果历史数据很大,建议切分后多次导入。
    4. 单次 Broker load 的数据量最好不要超过3G,如果历史数据很大,建议切分后多次导入。
    5. 如果数据超过3G建议直接使用 Spark load。太小的数据使用 spark load 性能稍差。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持