Flink 版本 | 说明 |
1.11 | 暂不支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 支持 |
CREATE TABLE `jdbc_sink_table` (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbcPG','url' = 'jdbc:postgresql://10.1.28.93:3306/CDB?reWriteBatchedInserts=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 PG 连接参数'table-name' = 'my-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);
CREATE TABLE `jdbc_upsert_sink_table` (`id` INT PRIMARY KEY NOT ENFORCED,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbcPG','url' = 'jdbc:postgresql://10.1.28.93:3306/CDB?reWriteBatchedInserts=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 PG 连接参数'table-name' = 'my-upsert-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | 连接数据库时,需要填写 'jdbcPG' 。 |
url | 是 | 无 | JDBC 数据库的连接 URL。 |
table-name | 是 | 无 | 数据库表名。 |
driver | 否 | 无 | JDBC Driver 的类名。如果不输入,则自动从 url 中推断。 |
username | 否 | 无 | 数据库用户名。 'username' 和'password' 必须同时使用。 |
password | 否 | 无 | 数据库密码。 |
scan.partition.column | 否 | 无 | |
scan.partition.num | 否 | 无 | 分区扫描启用后,指定分区数。 |
scan.partition.lower-bound | 否 | 无 | 分区扫描启用后,指定首个分区的最小值。 |
scan.partition.upper-bound | 否 | 无 | 分区扫描启用后,指定最后一个分区的最大值。 |
scan.fetch-size | 否 | 0 | 每次从数据库读取时,批量获取的行数。默认为0,表示一行一行读取,效率较低(吞吐量不高)。 |
lookup.cache.max-rows | 否 | 无 | 查询缓存(Lookup Cache)中最多缓存的数据条数。 |
lookup.cache.ttl | 否 | 无 | 查询缓存中每条记录最长的缓存时间。 |
lookup.max-retries | 否 | 3 | 数据库查询失败时,最多重试的次数。 |
sink.buffer-flush.max-rows | 否 | 100 | 批量输出时,缓存中最多缓存多少数据。如果设置为0,表示禁止输出缓存。 |
sink.buffer-flush.interval | 否 | 1s | 批量输出时,每批次最大的间隔(毫秒)。如果 'sink.buffer-flush.max-rows' 设为 '0' ,而这个选项不为零,则说明启用纯异步输出功能,即数据输出到算子、从算子最终写入数据库这两部分线程完全解耦。 |
sink.max-retries | 否 | 3 | 数据库写入失败时,最多重试的次数。 |
write-mode | 否 | insert | 以 copy 模式写入,可选 insert/copy |
copy-delimiter | 否 | Ι | copy 模式下面,字段分割符 |
CREATE TABLE jdbc_upsert_sink_table (id INT ,age INT,name STRING,PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH (-- 指定数据库连接参数'connector' = 'jdbcPG','url' = 'jdbc:postgresql://10.0.0.2:5436/postgres','table-name' = 'my-upsert-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'password', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '300', -- 批量输出的条数'sink.buffer-flush.interval' = '100s' -- 批量输出的间隔);-- MySQL CDC Source。配合 flink-connector-mysql-cdc 使用。要求 MySQL 版本 >= 5.7-- 参见 https://www.tencentcloud.com/document/product/849/52698?from_cn_redirect=1CREATE TABLE mysql_cdc_source_table (id INT,age INT,name STRING,PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '10.0.0.6', -- 数据库的 IP'port' = '3360', -- 数据库的访问端口'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'password', -- 数据库访问的密码-- 'scan.incremental.snapshot.enabled' = 'false' -- 如果 source 表没有设置 PRIMARY Key,需要启用该设置'database-name' = 'database', -- 需要同步的数据库'table-name' = 'table' -- 需要同步的数据表名);INSERT INTO jdbc_upsert_sink_table select * from mysql_cdc_source_table;
PRIMARY KEY NOT ENFORCED
约束。INSERT .. ON CONFLICT .. DO UPDATE SET ..
语法,而是依赖于攒批 upsert +攒批 delete。该语法兼容 PostgreSQL 9.5以下版本。jdbc:postgresql://10.1.28.93:3306/PG?reWriteBatchedInserts=true¤tSchema=数据库的Schema
本页内容是否解决了您的问题?