flink-connector-jdbc
Connector 组件已经内置了 MySQL 和 PostgreSQL 的驱动程序。若需要连接 Oracle 等其他的数据库,可通过附加自定义程序包的方式,上传相应的 JDBC Driver 的 JAR 包。Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持 |
1.14 | 支持 |
1.16 | 支持 |
CREATE TABLE `jdbc_source_table` (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'my-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'scan.partition.column' = 'id', -- 分区扫描列名'scan.partition.num' = '2', -- 分区数量'scan.partition.lower-bound' = '0', -- 首个分区的最小值'scan.partition.upper-bound' = '100', -- 最后一个分区的最大值'scan.fetch-size' = '1' -- 每次从数据库读取时,批量获取的行数);
CREATE TABLE `jdbc_dim_table` (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'my-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'lookup.cache.max-rows' = '100', -- 读缓存大小'lookup.cache.ttl' = '5000' -- 读缓存的 TTL);
CREATE TABLE `jdbc_sink_table` (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'my-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);
CREATE TABLE `jdbc_upsert_sink_table` (`id` INT PRIMARY KEY NOT ENFORCED,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'my-upsert-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | 连接数据库时,需要填写 'jdbc' 。 |
url | 是 | 无 | JDBC 数据库的连接 URL。 |
table-name | 是 | 无 | 数据库表名。 |
driver | 否 | 无 | JDBC Driver 的类名。如果不输入,则自动从 url 中推断。 |
username | 否 | 无 | 数据库用户名。 'username' 和'password' 必须同时使用。 |
password | 否 | 无 | 数据库密码。 |
scan.partition.column | 否 | 无 | |
scan.partition.num | 否 | 无 | 分区扫描启用后,指定分区数。 |
scan.partition.lower-bound | 否 | 无 | 分区扫描启用后,指定首个分区的最小值。 |
scan.partition.upper-bound | 否 | 无 | 分区扫描启用后,指定最后一个分区的最大值。 |
scan.fetch-size | 否 | 0 | 每次从数据库读取时,批量获取的行数。默认为0,表示一行一行读取,效率较低(吞吐量不高)。 |
lookup.cache.max-rows | 否 | 无 | 查询缓存(Lookup Cache)中最多缓存的数据条数。 |
lookup.cache.ttl | 否 | 无 | 查询缓存中每条记录最长的缓存时间。 |
lookup.max-retries | 否 | 3 | 数据库查询失败时,最多重试的次数。 |
sink.buffer-flush.max-rows | 否 | 100 | 批量输出时,缓存中最多缓存多少数据。如果设置为0,表示禁止输出缓存。 |
sink.buffer-flush.interval | 否 | 1s | 批量输出时,每批次最大的间隔(毫秒)。如果 'sink.buffer-flush.max-rows' 设为 '0' ,而这个选项不为零,则说明启用纯异步输出功能,即数据输出到算子、从算子最终写入数据库这两部分线程完全解耦。 |
sink.max-retries | 否 | 3 | 数据库写入失败时,最多重试的次数。 |
sink.ignore-delete | 否 | false | 是否忽略 delete 操作。 |
CREATE TABLE `jdbc_source_table` (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'my-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'scan.partition.column' = 'id', -- 分区扫描列名'scan.partition.num' = '2', -- 分区数量'scan.partition.lower-bound' = '0', -- 首个分区的最小值'scan.partition.upper-bound' = '100', -- 最后一个分区的最大值'scan.fetch-size' = '1' -- 每次从数据库读取时,批量获取的行数);CREATE TABLE `jdbc_upsert_sink_table` (`id` INT PRIMARY KEY NOT ENFORCED,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'my-upsert-table', -- 需要写入的数据表'username' = 'admin', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'MyPa$$w0rd', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);insert into jdbc_upsert_sink_table select * from jdbc_source_table;
PRIMARY KEY NOT ENFORCED
约束。INSERT .. ON DUPLICATE KEY UPDATE ..
语法,常见版本的 MySQL 均支持该语法。INSERT .. ON CONFLICT .. DO UPDATE SET ..
语法,该语法需要 PostgreSQL 9.5 及以上版本才可支持。MERGE .. INTO .. USING ON .. WHEN UPDATE .. WHEN INSERT ..
语法,该语法需要 Oracle 9i 及以上版本才可以支持。scan.partition
开头的参数都必须指定,否则会报错。scan.partition.upper-bound
指定的最大值和 scan.partition.lower-bound
指定的最小值,指的是每个分区的步长,不会影响最终读取的数据条数和精确性。lookup.cache.max-rows
和 lookup.cache.ttl
两个参数来启用该功能。jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true
jdbc:postgresql://10.1.28.93:3306/PG?reWriteBatchedInserts=true¤tSchema=数据库的Schema
本页内容是否解决了您的问题?