tencent cloud

文档反馈

数据库 PostgreSQL CDC

最后更新时间:2023-11-08 14:52:23

    介绍

    Postgres 的 CDC 源表(即 Postgres 的流式源表)用于依次读取 PostgreSQL 数据库全量快照数据和变更数据,保证不多读也不少读一条数据。即使发生故障,也能采用 Exactly Once 方式处理。

    版本说明

    Flink 版本
    说明
    1.11
    支持
    1.13
    支持
    1.14
    不支持
    1.16
    支持

    使用范围

    PostgreSQL CDC 只支持作为源表。支持的 PostgreSQL 版本为9.6及以上版本。

    DDL 定义

    CREATE TABLE postgres_cdc_source_table (
    id INT,
    name STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    ) WITH (
    'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
    'hostname' = 'yourHostname', -- 数据库的 IP
    'port' = '5432', -- 数据库的访问端口
    'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
    'password' = 'yourPassWord', -- 数据库访问的密码
    'database-name' = 'yourDatabaseName', -- 需要同步的数据库
    'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
    'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
    'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
    );

    WITH 参数

    参数
    说明
    是否必填
    备注
    connector
    源表类型
    固定值为 postgres-cdc
    hostname
    Postgres 数据库的 IP 地址或者 Hostname
    -
    username
    Postgres 数据库服务的用户名
    有特定权限(包括 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT)的 Postgres 用户
    password
    Postgres 数据库服务的密码
    -
    database-name
    Postgres 数据库名称
    -
    schema-name
    Postgres Schema 名称
    Schema 名称支持正则表达式以读取多个 Schema 的数据
    table-name
    Postgres 表名
    表名支持正则表达式以读取多个表的数据
    port
    Postgres 数据库服务的端口号
    默认值为5432
    decoding.plugin.name
    Postgres Logical Decoding 插件名称
    根据 Postgres 服务上安装的插件确定。支持的插件列表如下:
    decoderbufs(默认值)
    wal2json
    wal2json_rds
    wal2json_streaming
    wal2json_rds_streaming
    pgoutput
    debezium.*
    Debezium 属性参数
    从更细粒度控制 Debezium 客户端的行为。例如'debezium.slot.name' = 'xxxx',以避免出现 PSQLException: ERROR: replication slot "dl_test" is active for PID 19997 详情请参见 配置属性

    类型映射

    Postgres CDC 和 Flink 字段类型对应关系如下:
    Postgres CDC 字段类型
    Flink 字段类型
    SMALLINT
    SMALLINT
    INT2
    SMALLSERIAL
    SERIAL2
    INTEGER
    INT
    SERIAL
    BIGINT
    BIGINT
    BIGSERIAL
    REAL
    FLOAT
    FLOAT4
    FLOAT8
    DOUBLE
    DOUBLE PRECISION
    NUMERIC(p, s)
    DECIMAL(p, s)
    DECIMAL(p, s)
    BOOLEAN
    BOOLEAN
    DATE
    DATE
    TIME [(p)] [WITHOUT TIMEZONE]
    TIME [(p)] [WITHOUT TIMEZONE]
    TIMESTAMP [(p)] [WITHOUT TIMEZONE]
    TIMESTAMP [(p)] [WITHOUT TIMEZONE]
    CHAR(n)
    STRING
    CHARACTER(n)
    VARCHAR(n)
    CHARACTER VARYING(n)
    TEXT
    BYTEA
    BYTES

    代码示例

    CREATE TABLE postgres_cdc_source_table (
    id INT,
    name STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    ) WITH (
    'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
    'hostname' = 'yourHostname', -- 数据库的 IP
    'port' = '5432', -- 数据库的访问端口
    'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
    'password' = 'yourPassWord', -- 数据库访问的密码
    'database-name' = 'yourDatabaseName', -- 需要同步的数据库
    'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
    'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
    'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
    );
    
    CREATE TABLE `print_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    'connector' = 'print'
    );
    insert into print_table select * from postgres_cdc_source_table;

    注意事项

    用户权限

    用来同步的用户至少具有 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。
    CREATE ROLE debezium_user REPLICATION LOGIN;
    GRANT USAGE ON SCHEMA schema_name TO debezium_user;
    GRANT USAGE ON DATABASE schema_name TO debezium_user;
    GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持