Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Unsupported |
CREATE TABLE `cmq_source_json_table` (`id` INT,`name`STRING,PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.) WITH ('connector' = 'cmq', -- Here, it should be 'cmq'.'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.'queue' = 'queue_name', -- The name of the CMQ queue.'secret-id' = 'xxxx', -- The account secret ID.'secret-key' = 'xxxx', -- The account secret key.'sign-method' = 'HmacSHA1', -- The signature algorithm.'format' = 'json', -- The data format (JSON).'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.'batch-size' = '16', -- The number of messages consumed at a time.'request-timeout' = '5000ms', -- The request timeout period.'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.);
CREATE TABLE `cmq_source_csv_table` (`id` int,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.) WITH ('connector' = 'cmq', -- Here, it should be 'cmq'.'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.'queue' = 'queue_name', -- The name of the CMQ queue.'secret-id' = 'xxxx', -- The account secret ID.'secret-key' = 'xxxx', -- The account secret key.'sign-method' = 'HmacSHA1', -- The signature algorithm.'format' = 'csv', -- The data format (CSV).'batch-size' = '16', -- The number of messages consumed/sent at a time.'request-timeout' = '5000ms', -- The request timeout period.'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.);
CREATE TABLE `cmq_sink_json_table` (`id` int,`name` STRING) WITH ('connector' = 'cmq', -- Here, it should be 'cmq'.'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.'queue' = 'queue_name', -- The name of the CMQ queue.'secret-id' = 'xxxx', -- The account secret ID.'secret-key' = 'xxxx', -- The account secret key.'sign-method' = 'HmacSHA1', -- The signature algorithm.'format' = 'json', -- The data format (JSON).'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.'batch-size' = '16', -- The number of messages sent at a time.'request-timeout' = '5000ms', -- The request timeout period.'retry-times' = '3', -- The number of retries in case of failure to send a message.'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.);
CREATE TABLE `cmq_sink_csv_table` (`id` int,`name` STRING) WITH ('connector' = 'cmq', -- Here, it should be 'cmq'.'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.'queue' = 'queue_name', -- The name of the CMQ queue.'secret-id' = 'xxxx', -- The account secret ID.'secret-key' = 'xxxx', -- The account secret key.'sign-method' = 'HmacSHA1', -- The signature algorithm.'format' = 'csv', -- The data format (CSV).'batch-size' = '16', -- The number of messages sent at a time.'request-timeout' = '5000ms', -- The request timeout period.'retry-times' = '3', -- The number of retries in case of failure to send a message.'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.);
Option | Required | Default Value | Description |
connector | Yes | - | Here, it should be 'cmq' . |
hosts | Yes | - | |
queue | Yes | - | The CMQ queue name. |
secret-id | Yes | - | The account secret ID. |
secret-key | Yes | - | The account secret key. |
sign-method | No | HmacSHA1 | The signature algorithm. |
format | Yes | - | The input and output format of CMQ messages. Valid values include 'csv' and 'json' . |
batch-size | No | 16 | The number of messages sent/received at a time. |
request-timeout | No | 5000ms | The request timeout period. |
polling-wait-timeout | No | 10s | The time to wait in case of failure to obtain any data. |
key-alive-timeout | No | 60s | The valid time period for the deduplication of CMQ messages with primary keys. This option ensures that the same message is consumed only once, but does not guarantee global uniqueness. |
retry-times | No | 3 | The number of retries in case of failure to send a message. |
max-block-timeout | No | 0s | The maximum time to wait to batch send data. If it is '0s' , data will be sent immediately without waiting. |
Option | Required | Default Value | Description |
json.fail-on-missing-field | No | false | If this is true , the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed. |
json.ignore-parse-errors | No | false | If this is true , when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false , the job will fail in case of a parse error. |
json.timestamp-format.standard | No | SQL | The JSON timestamp format. The default value is SQL , in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision} . You can also set it to ISO-8601 , and the format will be yyyy-MM-ddTHH:mm:ss.s{precision} . |
Option | Required | Default Value | Description |
csv.field-delimiter | No | , | The field delimiter, which is comma by default. |
csv.line-delimiter | No | U&'\\000A' | The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A' ). You can also set it to \\r (in SQL, you need to use U&'\\000D' ). |
csv.disable-quote-character | No | false | Whether to disable quote characters. If this is true , 'csv.quote-character' cannot be used. |
csv.quote-character | No | '' | The quote characters. Text inside quotes will be viewed as a whole. The default value is '' . |
csv.ignore-parse-errors | No | false | Whether to ignore parse errors. If this is true , fields will be set to null in case of parse failure. |
csv.allow-comments | No | false | Whether to ignore comment lines that start with # and output them as empty lines (if this is true , make sure you set csv.ignore-parse-errors to true as well). |
csv.array-element-delimiter | No | ; | The array element delimiter, which is ; by default. |
csv.escape-character | No | - | The escape character. By default, escape characters are disabled. |
csv.null-literal | No | - | The string that will be seen as null. |
CREATE TABLE `cmq_source_json_table` (`id` int,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.) WITH ('connector' = 'cmq', -- Here, it should be 'cmq'.'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.'queue' = 'queue_name', -- The name of the CMQ queue.'secret-id' = 'xxxx', -- The account secret ID.'secret-key' = 'xxxx', -- The account secret key.'sign-method' = 'HmacSHA1', -- The signature algorithm.'format' = 'json', -- The data format (JSON).'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.'batch-size' = '16', -- The number of messages consumed at a time.'request-timeout' = '5000ms', -- The request timeout period.'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.);CREATE TABLE `cmq_sink_json_table` (`id` int,`name` STRING) WITH ('connector' = 'cmq', -- Here, it should be 'cmq'.'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.'queue' = 'queue_name', -- The name of the CMQ queue.'secret-id' = 'xxxx', -- The account secret ID.'secret-key' = 'xxxx', -- The account secret key.'sign-method' = 'HmacSHA1', -- The signature algorithm.'format' = 'json', -- The data format (JSON).'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.'batch-size' = '16', -- The number of messages sent at a time.'request-timeout' = '5000ms', -- The request timeout period.'retry-times' = '3', -- The number of retries in case of failure to send a message.'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.);insert into cmq_sink_json_table select * from cmq_source_json_table;
Was this page helpful?