CREATE TABLE `source_1`(`f_sequence` INT,`f_random` INT,`f_random_str` VARCHAR,PRIMARY KEY (`f_sequence`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc' ,'hostname' = 'ip1','port' = '3306','username' = 'xxx','password' = 'xxx','database-name' = 'db1','table-name' = 'source_1');CREATE TABLE `source_2`(`f_sequence` INT,`f_random` INT,`f_random_1` INT,`f_random_str` VARCHAR,`f_random_str_1` VARCHAR,PRIMARY KEY (`f_sequence`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc' ,'hostname' = 'ip1','port' = '3306','username' = 'xxx','password' = 'xxx','database-name' = 'db2','table-name' = 'source_2');CREATE TABLE `sink_1`(`f_sequence` INT,`f_random` INT,`f_random_str` VARCHAR,PRIMARY KEY (`f_sequence`) NOT ENFORCED) WITH ('connector' = 'logger');CREATE TABLE `sink_2`(`f_sequence` INT,`f_random` INT,`f_random_1` INT,`f_random_str` VARCHAR,`f_random_str_1` VARCHAR,PRIMARY KEY (`f_sequence`) NOT ENFORCED) WITH ('connector' = 'logger');insert into sink_1 select * from source_1;insert into sink_2 select * from source_2;
SET table.optimizer.mysql-cdc-source.merge.enabled=true;
参数 | 默认值 | 含义 |
table.optimizer.mysql-cdc-source.merge.enabled | false | 是否开启 mysql cdc source 复用功能,开启后会自动尝试 merge 同一作业内读相同 DB 的 mysql cdc source 为同一个 source。 |
table.optimizer.mysql-cdc-source.merge.default-group.splits | 1 | 开启 mysql cdc source 合并功能后,相同 DB 实例 mysql cdc source 会合并成的 source 数量。对于表非常多的场景,当合并成一个 source 不能满足性能要求时,可以通过 set 命令调高这个值,oceanus 会自动将 cdc source 尽量均匀的划分成指定的 group splits 数量。 |
SET table.optimizer.mysql-cdc-source.merge.enabled=true;SET table.optimizer.mysql-cdc-source.merge.default-group.splits=2;insert into sink_1 select * from source_1;insert into sink_2 select * from source_2;insert into sink_3 select * from source_3;insert into sink_4 select * from source_4;
本页内容是否解决了您的问题?