tencent cloud

All product documents
Stream Compute Service
TDMQ for RabbitMQ
Last updated: 2023-11-08 14:55:24
TDMQ for RabbitMQ
Last updated: 2023-11-08 14:55:24

Overview

TDMQ for RabbitMQ (RMQ) is Tencent's proprietary message queue service. It supports the AMQP 0-9-1 protocol and is compatible with all components of Apache RabbitMQ. It can be used as a sink. You can output stream data processed by Flink operators to an RMQ queue.

Versions

Flink Version‌
Description
1.11
Supported
1.13
Supported
1.14
Unsupported
1.16
Unsupported

Limits

RMQ can be used as a sink. It does not support upsert streams.

Defining a table in DDL

As a sink

JSON format

CREATE TABLE `rmq_sink_json_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.
'host' = 'xxxx', -- The RMQ host.
'port' = 'xxxx', -- The RMQ port.
'vhost' = '/', -- The virtual host.
'username' = 'xxxx', -- The username.
'password' = 'xxxx', -- The password.
'exchange' = 'exchange', -- The exchange name.
'routing-key' = 'Key', -- The key bound.
'format' = 'json', -- The data format (JSON).
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
);

CSV format

CREATE TABLE `rmq_sink_csv_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.
'host' = 'xxxx', -- The RMQ host.
'port' = 'xxxx', -- The RMQ port.
'vhost' = '/', -- The virtual host.
'username' = 'xxxx', -- The username.
'password' = 'xxxx', -- The password.
'exchange' = 'exchange', -- The exchange name.
'routing-key' = 'Key', -- The key bound.
'format' = 'csv', -- The data format (CSV).
);

WITH parameters

Common WITH parameters

Option
Required
Default Value
Description
connector
Yes
-
Here, it should be 'rabbitmq'.
host
Yes
-
The host of the queue.
port
Yes
5672
The RMQ port.
vhost
Yes
/
The virtual host.
name-server
Yes
guest
The role name.
password
Yes
guest
The password of the role.
queue
No
-
The queue name.
exchange
Yes
-
The exchange name.
routing-key
No
-
The key bound by the exchange.
delivery-mode
No
1
Whether messages will be persistent. 1: No; 2: Yes.
expiration
No
86400000
The message validity period (milliseconds), which is one day by default.
network-recovery-interval
No
30s
The network recovery interval.
automatic-recovery
No
true
Whether to connect to RMQ automatically. Automatic connection is enabled by default.
topology-recovery
No
true
Whether to recover RMQ topology automatically. Automatic topology recovery is enabled by default.
connection-timeout
No
30s
The connection timeout period, which is 30 seconds by default.
requested-frame-max
No
0
The maximum frame size (bytes) for the first request. If this is 0, no limit will be set.
requested-heartbeat
No
60s
The heartbeat timeout period.
prefetch-count
No
0
The maximum number of messages the server can send. If this is 0, no limit will be set. This option is only supported by Flink 1.13.
delivery-timeout
No
30s
The commit timeout period. This option is only supported by Flink 1.13.
format
Yes
-
The input and output format of RMQ messages. Valid values include 'csv' and 'json'.

WITH parameters for JSON

Option
Required
Default Value
Description
json.fail-on-missing-field
No
false
If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed.
json.ignore-parse-errors
No
false
If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error.
json.timestamp-format.standard
No
SQL
The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

WITH parameters for CSV

Option
Required
Default Value
Description
csv.field-delimiter
No
,
The field delimiter, which is comma by default.
csv.line-delimiter
No
U&'\000A'
The line delimiter, which is \n by default (in SQL, you must use U&'\000A'). You can also set it to \r (in SQL, you need to use U&'\000D'). This option is only supported by Flink 1.11.
csv.disable-quote-character
No
false
Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used.
csv.quote-character
No
''
The quote characters. Text inside quotes will be viewed as a whole. The default value is ''.
csv.ignore-parse-errors
No
false
Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
csv.allow-comments
No
false
Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well).
csv.array-element-delimiter
No
;
The array element delimiter, which is ; by default.
csv.escape-character
No
-
The escape character. By default, escape characters are disabled.
csv.null-literal
No
-
The string that will be seen as null.

Example

-- Please replace the parameter values below with the corresponding values for the cluster used.
CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'source_table_name',
'username' = 'username',
'password' = 'password'
);

CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH (
'connector' = 'rabbitmq',
'host' = 'host',
'port' = 'port',
'vhost' = 'vhost',
'username' = 'username',
'password' = 'password',
'queue' = 'queue-name',
'exchange'='exchange',
'routing-key'='key',
'format' = 'json'
);
insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
Note
When RMQ is used as a sink, there is a small possibility of duplication.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon