tencent cloud

All product documents
Stream Compute Service
Tencent Cloud Message Queue
Last updated: 2023-11-08 14:58:27
Tencent Cloud Message Queue
Last updated: 2023-11-08 14:58:27

Overview

Cloud Message Queue (CMQ) is a distributed message queue system developed based on Tencent's in-house messaging engine. It can be used as a source or a sink. You can ingest stream data to a CMQ queue, process the data using Flink operators, and output the result to another CMQ queue on the same or a different instance.

Versions

Flink Version
Description
1.11
Supported
1.13
Supported
1.14
Unsupported
1.16
Unsupported

Limits

CMQ can be used as a source or a sink for tuple streams. It does not support upsert streams currently.

Defining a table in DDL

As a source

JSON format

CREATE TABLE `cmq_source_json_table` (
`id` INT,
`name`STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.
) WITH (
'connector' = 'cmq', -- Here, it should be 'cmq'.
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
'queue' = 'queue_name', -- The name of the CMQ queue.
'secret-id' = 'xxxx', -- The account secret ID.
'secret-key' = 'xxxx', -- The account secret key.
'sign-method' = 'HmacSHA1', -- The signature algorithm.
'format' = 'json', -- The data format (JSON).
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
'batch-size' = '16', -- The number of messages consumed at a time.
'request-timeout' = '5000ms', -- The request timeout period.
'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.
'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.
);

CSV format

CREATE TABLE `cmq_source_csv_table` (
`id` int,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.
) WITH (
'connector' = 'cmq', -- Here, it should be 'cmq'.
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
'queue' = 'queue_name', -- The name of the CMQ queue.
'secret-id' = 'xxxx', -- The account secret ID.
'secret-key' = 'xxxx', -- The account secret key.
'sign-method' = 'HmacSHA1', -- The signature algorithm.
'format' = 'csv', -- The data format (CSV).
'batch-size' = '16', -- The number of messages consumed/sent at a time.
'request-timeout' = '5000ms', -- The request timeout period.
'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.
'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.
);

As a sink

JSON format

CREATE TABLE `cmq_sink_json_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'cmq', -- Here, it should be 'cmq'.
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
'queue' = 'queue_name', -- The name of the CMQ queue.
'secret-id' = 'xxxx', -- The account secret ID.
'secret-key' = 'xxxx', -- The account secret key.
'sign-method' = 'HmacSHA1', -- The signature algorithm.
'format' = 'json', -- The data format (JSON).
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
'batch-size' = '16', -- The number of messages sent at a time.
'request-timeout' = '5000ms', -- The request timeout period.
'retry-times' = '3', -- The number of retries in case of failure to send a message.
'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.
);

CSV format

CREATE TABLE `cmq_sink_csv_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'cmq', -- Here, it should be 'cmq'.
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
'queue' = 'queue_name', -- The name of the CMQ queue.
'secret-id' = 'xxxx', -- The account secret ID.
'secret-key' = 'xxxx', -- The account secret key.
'sign-method' = 'HmacSHA1', -- The signature algorithm.
'format' = 'csv', -- The data format (CSV).
'batch-size' = '16', -- The number of messages sent at a time.
'request-timeout' = '5000ms', -- The request timeout period.
'retry-times' = '3', -- The number of retries in case of failure to send a message.
'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.
);

WITH parameters

Common WITH parameters

Option
Required
Default Value
Description
connector
Yes
-
Here, it should be 'cmq'.
hosts
Yes
-
The name server of the queue's region. For details, see TCP SDK.
queue
Yes
-
The CMQ queue name.
secret-id
Yes
-
The account secret ID.
secret-key
Yes
-
The account secret key.
sign-method
No
HmacSHA1
The signature algorithm.
format
Yes
-
The input and output format of CMQ messages. Valid values include 'csv' and 'json'.
batch-size
No
16
The number of messages sent/received at a time.
request-timeout
No
5000ms
The request timeout period.
polling-wait-timeout
No
10s
The time to wait in case of failure to obtain any data.
key-alive-timeout
No
60s
The valid time period for the deduplication of CMQ messages with primary keys. This option ensures that the same message is consumed only once, but does not guarantee global uniqueness.
retry-times
No
3
The number of retries in case of failure to send a message.
max-block-timeout
No
0s
The maximum time to wait to batch send data. If it is '0s', data will be sent immediately without waiting.

WITH parameters for JSON

Option
Required
Default Value
Description
json.fail-on-missing-field
No
false
If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed.
json.ignore-parse-errors
No
false
If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error.
json.timestamp-format.standard
No
SQL
The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

WITH parameters for CSV

Option
Required
Default Value
Description
csv.field-delimiter
No
,
The field delimiter, which is comma by default.
csv.line-delimiter
No
U&'\\000A'
The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A'). You can also set it to \\r (in SQL, you need to use U&'\\000D').
csv.disable-quote-character
No
false
Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used.
csv.quote-character
No
''
The quote characters. Text inside quotes will be viewed as a whole. The default value is ''.
csv.ignore-parse-errors
No
false
Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
csv.allow-comments
No
false
Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well).
csv.array-element-delimiter
No
;
The array element delimiter, which is ; by default.
csv.escape-character
No
-
The escape character. By default, escape characters are disabled.
csv.null-literal
No
-
The string that will be seen as null.

Example

CREATE TABLE `cmq_source_json_table` (
`id` int,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.
) WITH (
'connector' = 'cmq', -- Here, it should be 'cmq'.
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
'queue' = 'queue_name', -- The name of the CMQ queue.
'secret-id' = 'xxxx', -- The account secret ID.
'secret-key' = 'xxxx', -- The account secret key.
'sign-method' = 'HmacSHA1', -- The signature algorithm.
'format' = 'json', -- The data format (JSON).
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
'batch-size' = '16', -- The number of messages consumed at a time.
'request-timeout' = '5000ms', -- The request timeout period.
'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.
'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.
);
CREATE TABLE `cmq_sink_json_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'cmq', -- Here, it should be 'cmq'.
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
'queue' = 'queue_name', -- The name of the CMQ queue.
'secret-id' = 'xxxx', -- The account secret ID.
'secret-key' = 'xxxx', -- The account secret key.
'sign-method' = 'HmacSHA1', -- The signature algorithm.
'format' = 'json', -- The data format (JSON).
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
'batch-size' = '16', -- The number of messages sent at a time.
'request-timeout' = '5000ms', -- The request timeout period.
'retry-times' = '3', -- The number of retries in case of failure to send a message.
'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.
);
insert into cmq_sink_json_table select * from cmq_source_json_table;

Notes

Please pay attention to the following when using CMQ as a source:
1. You can configure the primary key to achieve deduplication over a time period you specify. The longer this time period is, the higher the memory usage.
2. We strongly recommend you set the visibility timeout period of CMQ messages to a value larger than the checkpoint interval for your Flink job to prevent consumed messages from being consumed again.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon