Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported (as a batch source, sink, and dimension table) |
1.14 | Supported (as a batch source, sink, and dimension table) |
1.16 | Unsupported |
CREATE TABLE `mysql_cdc` (`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.'hostname' = '9.134.34.15', -- IP of the database server.'port' = '3306', -- Integer port number of the database server.'username' = 'root', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).'password' = 'xxx', -- Password to use when connecting to the MySQL database server.'database-name' = 'test', -- Database name of the MySQL server to monitor.'table-name' = 'user_behavior' -- Table name of the MySQL database to monitor.);CREATE TABLE `pk_starrocks`(`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'starrocks','jdbc-url' = 'jdbc:mysql://172.28.28.98:9030','load-url' = '172.28.28.98:8030','database-name' = 'oceanus','table-name' = 'pk_user_behavior','username' = 'root','password' = 'xxx','sink.buffer-flush.interval-ms' = '15000','sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true', -- Here, it should be "true".-- 'sink.parallelism' = '1','sink.max-retries' = '3','sink.semantic' = 'exactly-once');INSERT INTO `pk_starrocks` SELECT * FROM `mysql_cdc`;
Option | Required | Default Value | Data Type | Description |
connector | Yes | None | String | Here, it should be "starrocks". |
jdbc-url | Yes | None | String | The address that is used to connect to the MySQL server of the FE
in the format of jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port... ,
such as
jdbc:mysql://172.28.28.98:9030 . |
load-url | Yes | None | String | The HTTP URL of the FE in your StarRocks cluster
in the format of fe_ip1:http_port,fe_ip2:http_port.... ,
such as
172.28.28.98:8030 . |
database-name | Yes | None | String | The name of the StarRocks database into which you want to load data. |
table-name | Yes | None | String | The name of the table that you want to use to load data into StarRocks. |
username | Yes | None | String | The username of the account that you want to use to load data into StarRocks. |
password | Yes | None | String | The password of the preceding account. |
sink.semantic | No | at-least-once | String | Valid values: at-least-once exactly-once , which means data is flushed only when performing checkpoint. In this case, sink.buffer-flush.* options are invalid. |
sink.version | No | AUTO | String | Valid values: V1 : Use Stream Load interface to perform exactly-once processing. It has a relatively low performance and supports all StarRocks versions.V2 : Use Stream Load transaction to perform exactly-once processing. It requires StarRocks to be at least version 2.4.AUTO : Automatically choose the sink version depending on whether the version of StarRocks supports Stream Load transaction. |
sink.buffer-flush.max-bytes | No | 94371840 (90M) | String | A flush option, which is available only when sink.semantic is set to at-least-once . When the size of data in the buffer exceeds the value set here, the flush to StarRocks is triggered. Value range: [64MB, 10GB] . |
sink.buffer-flush.max-rows | No | 500000 | String | A flush option, which is available only when sink.semantic is set to at-least-once . When the number of data rows in the buffer exceeds the value set here, the flush to StarRocks is triggered. Value range: [64,000, 5000,000] . |
sink.buffer-flush.interval-ms | No | 300000 | String | A flush option, which is available only when sink.semantic is set to at-least-once . This option specifies the interval (in milliseconds) between two flushes to StarRocks. Value range: [1000, 3600000] . |
sink.max-retries | No | 3 | String | The number of times that the system retries to perform the Stream Load job. Value range: [0, 1000] . |
sink.parallelism | No | NULL | String | The parallelism of the connector. If no value is set, the global parallelism will be used. |
sink.connect.timeout-ms | No | 1000 | String | The timeout (ms) for waiting response from the load-url. Value range: [100, 60000] . |
sink.label-prefix | No | No | String | The label prefix used by Stream Load. Supported characters: [-_A-Za-z0-9] . For more details of labels, see Optional parameters of Stream Load. |
sink.properties.format | No | CSV | String | The format of data to be loaded to StarRocks. Valid values: CSV (default) and JSON . |
sink.properties.column_separator | No | \\t | String | |
sink.properties.row_delimiter | No | \\n | String | |
sink.properties.strip_outer_array | No | false | String | Specifies whether to strip the outermost array structure for JSON-formatted data. Valid values: true and false (default). For batch load from Flink, the JSON data may have an outermost array structure as indicated by a pair of square brackets []. In this situation, we recommend that you set this parameter to **true **, so that StarRocks removes the outermost square brackets [] and loads each inner array as a separate data record. If you set this parameter to false , StarRocks parses the entire JSON data file into one array and loads the array as a single data record. For example, the JSON data to be loaded is [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] . If you set this parameter to true , StarRocks parses {"category" : 1, "author" : 2} and {"category" : 3, "author" : 4} into separate data records that are loaded into separate StarRocks table rows. For details, see JSON parameters. |
sink.properties.* | No | None | String | The parameters that are used to control Stream Load behavior, such as 'sink.properties.columns' = 'k1, v1 . From StarRocks v2.4 onwards, Primary Key model supports updating specified columns. For more parameters, see STREAM LOAD. |
sink.semantic
to at-least-once
, flush to StarRocks will be triggered when sink.buffer-flush.max-bytes
, sink.buffer-flush.max-rows
, or sink.buffer-flush.interval-ms
is met.sink.semantic
to exactly-once
, depending on Flink checkpointing, the data and its labels will be saved during each checkpoint. After the checkpointing is completed, more writes to the database are blocked, and all the data cached in state will be flushed to the sink in the first invocation to guarantee the exactly-once semantic. In this situation, sink.buffer-flush.*
options are invalid.Flink Type | StarRocks Type |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON STRING |
ROW<arg T...> | JSON STRING |
Option | Required | Default Value | Data Type | Description |
connector | Yes | None | String | Here, it should be "starrocks". |
scan-url | Yes | None | String | The HTTP URL of the FE in your StarRocks cluster in the format of fe_ip1:http_port,fe_ip2:http_port.... , such as 172.28.28.98:8030 . |
jdbc-url | Yes | None | String | The address that is used to connect the MySQL client in the format of jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port... , such as jdbc:mysql://172.28.28.98:9030 . |
username | Yes | None | String | The username of your StarRocks cluster account. |
password | Yes | None | String | The password of your StarRocks cluster account. |
database-name | Yes | None | String | The name of the StarRocks database to which the StarRocks table you want to read belongs. |
table-name | Yes | None | String | The name of the StarRocks table you want to read. |
scan.connect.timeout-ms | No | 1000 | String | The maximum amount of time (in milliseconds) after which the connection from the Flink connector to your StarRocks cluster times out. |
scan.params.keep-alive-min | No | 10 | String | The maximum amount of time (in minutes) during which the read task keeps alive. |
scan.params.query-timeout-s | No | 600(5min) | String | The maximum amount of time (in seconds) after which the read task times out. |
scan.params.mem-limit-byte | No | 102410241024 (1G) | String | The maximum amount of memory allowed per query on each BE. |
scan.max-retries | No | 1 | String | The maximum number of times that the read task can be retried upon failures. |
lookup.cache.ttl-ms | No | 5000 | Long | The maximum amount of time (in milliseconds) after which the query cache of the dimension table times out. |
CREATE TABLE `starrocks` (`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'starrocks' ,'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port'scan-url' = '172.28.28.98:8030', -- http_port'database-name' = 'oceanus','table-name' = 'pk_user_behavior','username' = 'root','password' = 'xxx');CREATE TABLE `print_sink` (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'logger');INSERT INTO `print_sink`SELECT * FROM starrocks;
CREATE TABLE `starrocks` (`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'starrocks' ,'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port'scan-url' = '172.28.28.98:8030', -- http_port'database-name' = 'oceanus','table-name' = 'pk_user_behavior','username' = 'root','password' = 'xxx');CREATE TABLE `datagen` (`user_id` BIGINT,`ts` as PROCTIME(),PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.user_id.min' = '1','fields.user_id.max' = '20');CREATE TABLE `print_sink` (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'logger');INSERT INTO `print_sink`SELECT a.user_id,b.item_id,b.behavior,a.tsFROM `datagen` a LEFT JOIN `starrocks` FOR SYSTEM_TIME AS OF a.ts as bON a.user_id = b.user_id;
StarRocks | Flink |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
Was this page helpful?