Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Unsupported |
CREATE TABLE `rmq_sink_json_table` (`id` int,`name` STRING) WITH ('connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.'host' = 'xxxx', -- The RMQ host.'port' = 'xxxx', -- The RMQ port.'vhost' = '/', -- The virtual host.'username' = 'xxxx', -- The username.'password' = 'xxxx', -- The password.'exchange' = 'exchange', -- The exchange name.'routing-key' = 'Key', -- The key bound.'format' = 'json', -- The data format (JSON).'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);
CREATE TABLE `rmq_sink_csv_table` (`id` int,`name` STRING) WITH ('connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.'host' = 'xxxx', -- The RMQ host.'port' = 'xxxx', -- The RMQ port.'vhost' = '/', -- The virtual host.'username' = 'xxxx', -- The username.'password' = 'xxxx', -- The password.'exchange' = 'exchange', -- The exchange name.'routing-key' = 'Key', -- The key bound.'format' = 'csv', -- The data format (CSV).);
Option | Required | Default Value | Description |
connector | Yes | - | Here, it should be 'rabbitmq' . |
host | Yes | - | The host of the queue. |
port | Yes | 5672 | The RMQ port. |
vhost | Yes | / | The virtual host. |
name-server | Yes | guest | The role name. |
password | Yes | guest | The password of the role. |
queue | No | - | The queue name. |
exchange | Yes | - | The exchange name. |
routing-key | No | - | The key bound by the exchange. |
delivery-mode | No | 1 | Whether messages will be persistent. 1 : No; 2 : Yes. |
expiration | No | 86400000 | The message validity period (milliseconds), which is one day by default. |
network-recovery-interval | No | 30s | The network recovery interval. |
automatic-recovery | No | true | Whether to connect to RMQ automatically. Automatic connection is enabled by default. |
topology-recovery | No | true | Whether to recover RMQ topology automatically. Automatic topology recovery is enabled by default. |
connection-timeout | No | 30s | The connection timeout period, which is 30 seconds by default. |
requested-frame-max | No | 0 | The maximum frame size (bytes) for the first request. If this is 0 , no limit will be set. |
requested-heartbeat | No | 60s | The heartbeat timeout period. |
prefetch-count | No | 0 | The maximum number of messages the server can send. If this is 0 , no limit will be set. This option is only supported by Flink 1.13. |
delivery-timeout | No | 30s | The commit timeout period. This option is only supported by Flink 1.13. |
format | Yes | - | The input and output format of RMQ messages. Valid values include 'csv' and 'json' . |
Option | Required | Default Value | Description |
json.fail-on-missing-field | No | false | If this is true , the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed. |
json.ignore-parse-errors | No | false | If this is true , when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false , the job will fail in case of a parse error. |
json.timestamp-format.standard | No | SQL | The JSON timestamp format. The default value is SQL , in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision} . You can also set it to ISO-8601 , and the format will be yyyy-MM-ddTHH:mm:ss.s{precision} . |
Option | Required | Default Value | Description |
csv.field-delimiter | No | , | The field delimiter, which is comma by default. |
csv.line-delimiter | No | U&'\\000A' | The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A' ). You can also set it to \\r (in SQL, you need to use U&'\\000D' ). This option is only supported by Flink 1.11. |
csv.disable-quote-character | No | false | Whether to disable quote characters. If this is true , 'csv.quote-character' cannot be used. |
csv.quote-character | No | '' | The quote characters. Text inside quotes will be viewed as a whole. The default value is '' . |
csv.ignore-parse-errors | No | false | Whether to ignore parse errors. If this is true , fields will be set to null in case of parse failure. |
csv.allow-comments | No | false | Whether to ignore comment lines that start with # and output them as empty lines (if this is true , make sure you set csv.ignore-parse-errors to true as well). |
csv.array-element-delimiter | No | ; | The array element delimiter, which is ; by default. |
csv.escape-character | No | - | The escape character. By default, escape characters are disabled. |
csv.null-literal | No | - | The string that will be seen as null. |
-- Please replace the parameter values below with the corresponding values for the cluster used.CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai','table-name' = 'source_table_name','username' = 'username','password' = 'password');CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH ('connector' = 'rabbitmq','host' = 'host','port' = 'port','vhost' = 'vhost','username' = 'username','password' = 'password','queue' = 'queue-name','exchange'='exchange','routing-key'='key','format' = 'json');insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
Was this page helpful?