tencent cloud

文档反馈

消息队列 TDMQ RabbitMQ

最后更新时间:2023-11-08 14:58:06

    介绍

    消息队列 TDMQ RabbitMQ(TDMQ for RabbitMQ,以下简称 RMQ)是一款腾讯自主研发的消息队列服务,支持 AMQP 0-9-1 协议,完全兼容开源 RabbitMQ 的各个组件,可以用作数据目的(Sink)。用户可以通过 Flink 算子把流数据导入到 RMQ 的某个 Queue 中。

    版本说明

    Flink 版本
    说明
    1.11
    支持
    1.13
    支持
    1.14
    不支持
    1.16
    不支持

    使用范围

    RMQ 支持用作数据目的表(Sink),暂不支持 Upsert 数据流。

    DDL 定义

    用作数据目的(Sink)

    JSON 格式输出

    CREATE TABLE `rmq_sink_json_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'rabbitmq', -- 必须为 'rabbitmq'
    'host' = 'xxxx', -- rabbitmq host
    'port' = 'xxxx', -- rabbitmq 端口
    'vhost' = '/', -- 虚拟主机
    'username' = 'xxxx', -- 用户名
    'password' = 'xxxx', -- 用户密码
    'exchange' = 'exchange', -- 交换机名
    'routing-key' = 'Key', -- 绑定 Key
    'format' = 'json', -- 定义数据格式(JSON 格式)
    'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错
    'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错
    );

    CSV 格式输出

    CREATE TABLE `rmq_sink_csv_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'rabbitmq', -- 必须为 'rabbitmq'
    'host' = 'xxxx', -- rabbitmq host
    'port' = 'xxxx', -- rabbitmq 端口
    'vhost' = '/', -- 虚拟主机
    'username' = 'xxxx', -- 用户名
    'password' = 'xxxx', -- 用户密码
    'exchange' = 'exchange', -- 交换机名
    'routing-key' = 'Key', -- 绑定 Key
    'format' = 'csv' -- 定义数据格式(CSV 格式)
    );

    WITH 参数

    通用 WITH 参数

    参数值
    必填
    默认值
    描述
    connector
    必须指定为 'rabbitmq'
    host
    队列所在 host。
    port
    5672
    rabbitmq 端口。
    vhost
    /
    虚拟主机。
    username
    guest
    角色名。
    password
    guest
    角色密码。
    queue
    队列名。
    exchange
    交换机名。
    routing-key
    交换机绑定 Key。
    delivery-mode
    1
    消息是否持久化,1:非持久化 2:持久化。
    expiration
    86400000
    消息过期时间,默认一天(单位:毫秒)。
    network-recovery-interval
    30s
    网络恢复间隔。
    automatic-recovery
    true
    rabbitmq 自动连接,默认自动连接。
    topology-recovery
    true
    rabbitmq 拓扑恢复,默认自动恢复。
    connection-timeout
    30s
    连接超时时间,默认30s。
    requested-frame-max
    0
    最初请求的最大通信帧大小,以字节为单位。0意味着无限制。
    requested-heartbeat
    60s
    请求心跳超时。
    prefetch-count
    0
    服务器发送的消息的最大数量,0 意味着无限制。(仅 1.13 支持)
    delivery-timeout
    30s
    提交队列超时时间。(仅 1.13 支持)
    format
    RMQ 消息的输入输出格式。目前支持'csv''json'

    JSON 格式 WITH 参数

    参数值
    必填
    默认值
    描述
    json.fail-on-missing-field
    false
    如果为 true,则遇到缺失字段时,会让作业失败。如果为 false(默认值),则只会把缺失字段设置为 null 并继续处理。
    json.ignore-parse-errors
    false
    如果为 true,则遇到解析异常时,会把这个字段设置为 null 并继续处理。如果为 false,则会让作业失败。
    json.timestamp-format.standard
    SQL
    指定 JSON 时间戳字段的格式,默认是 SQL(格式是yyyy-MM-dd HH:mm:ss.s{可选精度})。也可以选择 ISO-8601,格式是 yyyy-MM-ddTHH:mm:ss.s{可选精度}

    CSV 格式 WITH 参数

    参数值
    必填
    默认值
    描述
    csv.field-delimiter
    ,
    指定 CSV 字段分隔符,默认是半角逗号。
    csv.line-delimiter
    U&'\\000A'
    指定 CSV 的行分隔符,默认是换行符\\n,SQL 中必须用U&'\\000A'表示。如果需要使用回车符\\r,SQL 中必须使用U&'\\000D'表示。(仅 1.11 支持)
    csv.disable-quote-character
    false
    禁止字段包围引号。如果为 true,则 'csv.quote-character' 选项不可用。
    csv.quote-character
    ''
    字段包围引号,引号内部的作为整体看待。默认是''
    csv.ignore-parse-errors
    false
    忽略处理错误。对于无法解析的字段,会输出为 null。
    csv.allow-comments
    false
    忽略 # 开头的注释行,并输出为空行(请务必将 csv.ignore-parse-errors 设为 true)。
    csv.array-element-delimiter
    ;
    数组元素的分隔符,默认是;
    csv.escape-character
    指定转义符,默认禁用转义。
    csv.null-literal
    将指定的字符串看作 null 值。

    代码示例

    -- 提示:请将参数替换成所属集群的信息
    CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
    'table-name' = 'source_table_name',
    'username' = 'username',
    'password' = 'password'
    );
    
    CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH (
    'connector' = 'rabbitmq',
    'host' = 'host',
    'port' = 'port',
    'vhost' = 'vhost',
    'username' = 'username',
    'password' = 'password',
    'queue' = 'queue-name',
    'exchange'='exchange',
    'routing-key'='key',
    'format' = 'json'
    );
    insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
    注意
    RMQ 作为数据目的表(Sink)使用的时候,需要注意往 RMQ 写入数据时有小概率会重复写入。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持