Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 支持 |
CREATE TABLE postgres_cdc_source_table (id INT,name STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'postgres-cdc', -- 固定值 'postgres-cdc''hostname' = 'yourHostname', -- 数据库的 IP'port' = '5432', -- 数据库的访问端口'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)'password' = 'yourPassWord', -- 数据库访问的密码'database-name' = 'yourDatabaseName', -- 需要同步的数据库'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符);
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值为 postgres-cdc |
hostname | Postgres 数据库的 IP 地址或者 Hostname | 是 | - |
username | Postgres 数据库服务的用户名 | 是 | 有特定权限(包括 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT)的 Postgres 用户 |
password | Postgres 数据库服务的密码 | 是 | - |
database-name | Postgres 数据库名称 | 是 | - |
schema-name | Postgres Schema 名称 | 是 | Schema 名称支持正则表达式以读取多个 Schema 的数据 |
table-name | Postgres 表名 | 是 | 表名支持正则表达式以读取多个表的数据 |
port | Postgres 数据库服务的端口号 | 否 | 默认值为5432 |
decoding.plugin.name | Postgres Logical Decoding 插件名称 | 否 | 根据 Postgres 服务上安装的插件确定。支持的插件列表如下: decoderbufs(默认值) wal2json wal2json_rds wal2json_streaming wal2json_rds_streaming pgoutput |
debezium.* | Debezium 属性参数 | 否 | 从更细粒度控制 Debezium 客户端的行为。例如 'debezium.slot.name' = 'xxxx' ,以避免出现 PSQLException: ERROR: replication slot "dl_test" is active for PID 19997 详情请参见 配置属性 |
Postgres CDC 字段类型 | Flink 字段类型 |
SMALLINT | SMALLINT |
| INT2 |
| SMALLSERIAL |
| SERIAL2 |
INTEGER | INT |
| SERIAL |
BIGINT | BIGINT |
| BIGSERIAL |
REAL | FLOAT |
| FLOAT4 |
FLOAT8 | DOUBLE |
| DOUBLE PRECISION |
NUMERIC(p, s) | DECIMAL(p, s) |
| DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
| CHARACTER(n) |
| VARCHAR(n) |
| CHARACTER VARYING(n) |
| TEXT |
BYTEA | BYTES |
CREATE TABLE postgres_cdc_source_table (id INT,name STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'postgres-cdc', -- 固定值 'postgres-cdc''hostname' = 'yourHostname', -- 数据库的 IP'port' = '5432', -- 数据库的访问端口'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)'password' = 'yourPassWord', -- 数据库访问的密码'database-name' = 'yourDatabaseName', -- 需要同步的数据库'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符);CREATE TABLE `print_table` (`id` INT,`name` STRING) WITH ('connector' = 'print');insert into print_table select * from postgres_cdc_source_table;
CREATE ROLE debezium_user REPLICATION LOGIN;GRANT USAGE ON SCHEMA schema_name TO debezium_user;GRANT USAGE ON DATABASE schema_name TO debezium_user;GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
本页内容是否解决了您的问题?