tencent cloud

文档反馈

数据库 MongoDB CDC

最后更新时间:2023-11-08 14:55:00

    介绍

    MongoDB 的 CDC 源表(即 MongoDB 的流式源表),Connector 会自动跟踪 MongoDB 副本集或分片集群,以获取数据库和集合中的文档更改。

    版本说明

    Flink 版本
    说明
    1.11
    不支持
    1.13
    支持
    1.14
    支持
    1.16
    支持

    使用范围

    MongoDB CDC 只支持作为源表,MongoDB CDC 支持4.0、4.2、5.0版本,MongoDB 集群必须是副本集或者分片集群。

    DDL 定义

    -- register a MongoDB table 'products' in Flink SQL
    CREATE TABLE mongo_cdc_source_table (
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
    );

    WITH 参数

    参数
    说明
    是否必填
    备注
    connector
    源表类型
    固定值为 mongodb-cdc
    hosts
    MongoDB 数据库的 IP 端口对
    -
    username
    MongoDB 数据库服务的用户名
    -
    password
    MongoDB 数据库服务的密码
    -
    database
    MongoDB 数据库名称
    -
    collection
    MongoDB Collection 名称
    -
    connection.options
    MongoDB 的 连接选项。有多个时,使用&连接,例如 relicaSet=test&connectTimeoutMS=300000
    -
    errors.tolerance
    是否忽略错误记录,接受 none 或者 all。如果设置为 all, 忽略错误记录
    none
    errors.log.enable
    是否需要把错误操作打印到日志文件
    默认值为 true
    copy.existing
    是否复制库中原有的数据,如果在复制期间对数据有更改,会在数据复制完成后应用更改
    默认值为 true
    copy.existing.pipeline
    当复制原有数据的时候,可以通过这个参数设置筛选条件。例如[{"$match": {"closed": "false"}}],只会复制 closed 为 false 的 记录。用法参考 $match (aggregation)
    -
    copy.existing.max.threads
    执行数据复制时要使用的线程数
    默认值为 Processors Count
    copy.existing.queue.size
    复制数据时要使用的队列的最大大小
    默认值为16000
    poll.max.batch.size
    每次拉取数据的最大数量。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
    默认值为1000
    poll.await.time.ms
    拉取数据的时间间隔。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
    默认值为1500
    heartbeat.interval.ms
    发送心跳消息时间间隔,以毫秒为单位。使用0禁用
    默认值为0
    说明
    Note:当数据流变化慢的时候,建议把 heartbeat.interval.ms 设置为一个合适的值,心跳会推送 resumeToken,防止当 Flink job 从 checkpoint 或者 savepoint 恢复的时候,resumeToken 已经过期。

    类型映射

    MongoDB 字段类型
    Flink 字段类型
    -
    TINYINT
    -
    SMALLINT
    Int
    INT
    Long
    BIGINT
    -
    FLOAT
    Double
    DOUBLE
    Decimal128
    DECIMAL(p, s)
    Boolean
    BOOLEAN
    DateTimestamp
    DATE
    DateTimestamp
    TIME
    Date
    TIMESTAMP(3)TIMESTAMP_LTZ(3)
    Timestamp
    TIMESTAMP(0)TIMESTAMP_LTZ(0)
    String
    ObjectId
    UUID
    Symbol
    MD5
    JavaScript
    Regex
    STRING
    BinData
    BYTES
    Object
    ROW
    Array
    ARRAY
    DBPointer
    ROW<$ref STRING, $id STRING>
    Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
    Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
    ...

    代码示例

    CREATE TABLE mongo_cdc_source_table (
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
    );
    CREATE TABLE `print_table` (
    `id` STRING,
    `name` STRING,
    `currency` STRING
    ) WITH (
    'connector' = 'print'
    );
    insert into print_table select _id, name, price.currency from mongo_cdc_source_table;

    注意事项

    用户权限

    MongoDB 的 User 必须有 changeStream 和 read 权限。
    use admin;
    db.createUser(
    {
    user: "flinkuser",
    pwd: "flinkpw",
    roles: [
    { role: "read", db: "admin" },
    { role: "readAnyDatabase", db: "admin" }
    ]
    }
    );

    并行度

    任务的并行度只支持为1。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持