tencent cloud

Feedback

TDMQ for RabbitMQ

Last updated: 2023-11-08 14:55:24

    Overview

    TDMQ for RabbitMQ (RMQ) is Tencent's proprietary message queue service. It supports the AMQP 0-9-1 protocol and is compatible with all components of Apache RabbitMQ. It can be used as a sink. You can output stream data processed by Flink operators to an RMQ queue.

    Versions

    Flink Version‌
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Unsupported

    Limits

    RMQ can be used as a sink. It does not support upsert streams.

    Defining a table in DDL

    As a sink

    JSON format

    CREATE TABLE `rmq_sink_json_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.
    'host' = 'xxxx', -- The RMQ host.
    'port' = 'xxxx', -- The RMQ port.
    'vhost' = '/', -- The virtual host.
    'username' = 'xxxx', -- The username.
    'password' = 'xxxx', -- The password.
    'exchange' = 'exchange', -- The exchange name.
    'routing-key' = 'Key', -- The key bound.
    'format' = 'json', -- The data format (JSON).
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    );

    CSV format

    CREATE TABLE `rmq_sink_csv_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.
    'host' = 'xxxx', -- The RMQ host.
    'port' = 'xxxx', -- The RMQ port.
    'vhost' = '/', -- The virtual host.
    'username' = 'xxxx', -- The username.
    'password' = 'xxxx', -- The password.
    'exchange' = 'exchange', -- The exchange name.
    'routing-key' = 'Key', -- The key bound.
    'format' = 'csv', -- The data format (CSV).
    );

    WITH parameters

    Common WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    -
    Here, it should be 'rabbitmq'.
    host
    Yes
    -
    The host of the queue.
    port
    Yes
    5672
    The RMQ port.
    vhost
    Yes
    /
    The virtual host.
    name-server
    Yes
    guest
    The role name.
    password
    Yes
    guest
    The password of the role.
    queue
    No
    -
    The queue name.
    exchange
    Yes
    -
    The exchange name.
    routing-key
    No
    -
    The key bound by the exchange.
    delivery-mode
    No
    1
    Whether messages will be persistent. 1: No; 2: Yes.
    expiration
    No
    86400000
    The message validity period (milliseconds), which is one day by default.
    network-recovery-interval
    No
    30s
    The network recovery interval.
    automatic-recovery
    No
    true
    Whether to connect to RMQ automatically. Automatic connection is enabled by default.
    topology-recovery
    No
    true
    Whether to recover RMQ topology automatically. Automatic topology recovery is enabled by default.
    connection-timeout
    No
    30s
    The connection timeout period, which is 30 seconds by default.
    requested-frame-max
    No
    0
    The maximum frame size (bytes) for the first request. If this is 0, no limit will be set.
    requested-heartbeat
    No
    60s
    The heartbeat timeout period.
    prefetch-count
    No
    0
    The maximum number of messages the server can send. If this is 0, no limit will be set. This option is only supported by Flink 1.13.
    delivery-timeout
    No
    30s
    The commit timeout period. This option is only supported by Flink 1.13.
    format
    Yes
    -
    The input and output format of RMQ messages. Valid values include 'csv' and 'json'.

    WITH parameters for JSON

    Option
    Required
    Default Value
    Description
    json.fail-on-missing-field
    No
    false
    If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed.
    json.ignore-parse-errors
    No
    false
    If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error.
    json.timestamp-format.standard
    No
    SQL
    The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

    WITH parameters for CSV

    Option
    Required
    Default Value
    Description
    csv.field-delimiter
    No
    ,
    The field delimiter, which is comma by default.
    csv.line-delimiter
    No
    U&'\\000A'
    The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A'). You can also set it to \\r (in SQL, you need to use U&'\\000D'). This option is only supported by Flink 1.11.
    csv.disable-quote-character
    No
    false
    Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used.
    csv.quote-character
    No
    ''
    The quote characters. Text inside quotes will be viewed as a whole. The default value is ''.
    csv.ignore-parse-errors
    No
    false
    Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
    csv.allow-comments
    No
    false
    Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well).
    csv.array-element-delimiter
    No
    ;
    The array element delimiter, which is ; by default.
    csv.escape-character
    No
    -
    The escape character. By default, escape characters are disabled.
    csv.null-literal
    No
    -
    The string that will be seen as null.

    Example

    -- Please replace the parameter values below with the corresponding values for the cluster used.
    CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
    'table-name' = 'source_table_name',
    'username' = 'username',
    'password' = 'password'
    );
    
    CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH (
    'connector' = 'rabbitmq',
    'host' = 'host',
    'port' = 'port',
    'vhost' = 'vhost',
    'username' = 'username',
    'password' = 'password',
    'queue' = 'queue-name',
    'exchange'='exchange',
    'routing-key'='key',
    'format' = 'json'
    );
    insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
    Note
    When RMQ is used as a sink, there is a small possibility of duplication.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support