tencent cloud

文档反馈

数据库 MongoDB

最后更新时间:2023-11-08 14:51:30

    介绍

    Flink connector mongodb 目前支持通过 Flink 将数据批量写入到 mongodb 中,目前仅支持 append 流。

    版本说明

    Flink 版本
    说明
    1.11
    不支持
    1.13
    支持
    1.14
    不支持
    1.16
    支持

    DDL定义

    CREATE TABLE mongodb (
    user_id INT,
    item_id INT,
    category_id INT,
    behavior VARCHAR
    ) WITH (
    'connector' = 'mongodb', -- 固定值 'mongodb'
    'database' = 'test', --数据库名
    'collection' = 'table1',--数据集合
    'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
    'batchSize' = '1024' -- 每次批量写入的条数
    );

    使用范围

    Flink connector mongodb 目前仅支持 mongodb sink。支持将腾讯云数据库 MongoDB 作为结果表使用。

    WITH参数

    参数
    说明
    是否必填
    备注
    connector
    结果表类型
    固定值 mongodb
    database
    数据库名称
    -
    collection
    数据集合
    -
    uri
    MongoDB 连接串
    -
    batchSize
    每次批量写入的条数
    默认1024
    maxConnectionIdleTime
    连接超时时长
    默认值为60000,单位为毫秒

    代码示例

    CREATE TABLE random_source (
    user_id INT,
    item_id INT,
    category_id INT,
    behavior VARCHAR
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100', -- 每秒产生的数据条数
    'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)
    'fields.user_id.start' = '1', -- 序列的起始值
    'fields.user_id.end' = '10000', -- 序列的终止值
    'fields.item_id.kind' = 'random', -- 无界的随机数
    'fields.item_id.min' = '1', -- 随机数的最小值
    'fields.item_id.max' = '1000', -- 随机数的最大值
    'fields.category_id.kind' = 'random', -- 无界的随机数
    'fields.category_id.min' = '1', -- 随机数的最小值
    'fields.category_id.max' = '1000', -- 随机数的最大值
    'fields.behavior.length' = '5' -- 随机字符串的长度
    );
    
    CREATE TABLE mongodb (
    user_id INT,
    item_id INT,
    category_id INT,
    behavior VARCHAR
    ) WITH (
    'connector' = 'mongodb', -- 固定值 'mongodb'
    'database' = 'test', --数据库名
    'collection' = 'table1',--数据集合
    'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
    'batchSize' = '1024' -- 每次批量写入的条数
    );
    
    insert into mongodb select * from random_source;

    注意事项

    Upsert

    MongoDB sink 暂不支持 upsert。

    用户权限

    MongoDB 的 User 必须拥有 database 的写权限。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持