tencent cloud

All product documents
Stream Compute Service
StarRocks
Last updated: 2023-11-08 14:24:26
StarRocks
Last updated: 2023-11-08 14:24:26

Overview

The ‌flink-connector-starrocks connector is based on the open-source starrocks-connector-for-apache-flink v1.2.4. Data can be read from and written to StarRocks via Flink.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported (as a batch source, sink, and dimension table)
1.14
Supported (as a batch source, sink, and dimension table)
1.16
Unsupported

As a sink

The ‌flink-connector-starrocks connector can be used as a sink to load data into StarRocks. It has a higher and more stable performance than flink-connector-jdbc provided by Apache Flink. It caches data and batch loads it into StarRocks using the Stream Load transaction interface. Two data formats are supported: CSV and JSON.
The following example shows how to load data from a MySQL-CDC connector to StarRocks.
CREATE TABLE `mysql_cdc` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
'hostname' = '9.134.34.15', -- IP of the database server.
'port' = '3306', -- Integer port number of the database server.
'username' = 'root', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
'password' = 'xxx', -- Password to use when connecting to the MySQL database server.
'database-name' = 'test', -- Database name of the MySQL server to monitor.
'table-name' = 'user_behavior' -- Table name of the MySQL database to monitor.
);

CREATE TABLE `pk_starrocks`(
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030',
'load-url' = '172.28.28.98:8030',
'database-name' = 'oceanus',
'table-name' = 'pk_user_behavior',
'username' = 'root',
'password' = 'xxx',
'sink.buffer-flush.interval-ms' = '15000',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true', -- Here, it should be "true".
-- 'sink.parallelism' = '1',
'sink.max-retries' = '3',
'sink.semantic' = 'exactly-once'
);

INSERT INTO `pk_starrocks` SELECT * FROM `mysql_cdc`;
Note
The StarRocks table needs to use the Primary Key model. Otherwise, data deletion from the source table cannot be synced to StarRocks.
You can use the StarRocks Migration Tools (SMT) provided by StarRocks to sync the database and table schema to StarRocks as instructed in Synchronize database & table schema.

WITH parameters

Option
Required
Default Value
Data Type
Description
connector
Yes
None
String
Here, it should be "starrocks".
jdbc-url
Yes
None
String
The address that is used to connect to the MySQL server of the FE in the format of jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port..., such as jdbc:mysql://172.28.28.98:9030.
load-url
Yes
None
String
The HTTP URL of the FE in your StarRocks cluster  in the format of fe_ip1:http_port,fe_ip2:http_port...., such as 172.28.28.98:8030.
database-name
Yes
None
String
The name of the StarRocks database into which you want to load data.
table-name
Yes
None
String
The name of the table that you want to use to load data into StarRocks.
username
Yes
None
String
The username of the account that you want to use to load data into StarRocks.
password
Yes
None
String
The password of the preceding account.
sink.semantic
No
at-least-once
String
Valid values:
at-least-once 
exactly-once, which means data is flushed only when performing checkpoint. In this case, sink.buffer-flush.* options are invalid.
sink.version
No
AUTO
String
Valid values:
V1: Use Stream Load interface to perform exactly-once processing. It has a relatively low performance and supports all StarRocks versions.
V2: Use Stream Load transaction to perform exactly-once processing. It requires StarRocks to be at least version 2.4.
AUTO: Automatically choose the sink version depending on whether the version of StarRocks supports Stream Load transaction.
sink.buffer-flush.max-bytes
No
94371840 (90M)
String
A flush option, which is available only when sink.semantic is set to at-least-once. When the size of data in the buffer exceeds the value set here, the flush to StarRocks is triggered. Value range: [64MB, 10GB].
sink.buffer-flush.max-rows
No
500000
String
A flush option, which is available only when sink.semantic is set to at-least-once. When the number of data rows in the buffer exceeds the value set here, the flush to StarRocks is triggered. Value range: [64,000, 5000,000].
sink.buffer-flush.interval-ms
No
300000
String
A flush option, which is available only when sink.semantic is set to at-least-once. This option specifies the interval (in milliseconds) between two flushes to StarRocks. Value range: [1000, 3600000].
sink.max-retries
No
3
String
The number of times that the system retries to perform the Stream Load job. ‌Value range: [0, 1000].
sink.parallelism
No
NULL
String
The parallelism of the connector. If no value is set, the global parallelism will be used.
sink.connect.timeout-ms
No
1000
String
The timeout (ms) for waiting response from the load-url. Value range: [100, 60000].
sink.label-prefix
No
No
String
The label prefix used by Stream Load. Supported characters: [-_A-Za-z0-9]. For more details of labels, see Optional parameters of Stream Load.
sink.properties.format
No
CSV
String
The format of data to be loaded to StarRocks. Valid values: CSV (default) and JSON.
sink.properties.column_separator
No
\t
String
The column separator for CSV-formatted data. For details, see CSV parameters.
sink.properties.row_delimiter
No
\n
String
The row delimiter for CSV-formatted data. For details, see CSV parameters.
sink.properties.strip_outer_array
No
false
String
Specifies whether to strip the outermost array structure for JSON-formatted data. Valid values: true and false (default). For batch load from Flink, the JSON data may have an outermost array structure as indicated by a pair of square brackets []. In this situation, we recommend that you set this parameter to **true**, so that StarRocks removes the outermost square brackets [] and loads each inner array as a separate data record. If you set this parameter to false, StarRocks parses the entire JSON data file into one array and loads the array as a single data record. For example, the JSON data to be loaded is [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ]. If you set this parameter to true, StarRocks parses {"category" : 1, "author" : 2} and {"category" : 3, "author" : 4} into separate data records that are loaded into separate StarRocks table rows. For details, see JSON parameters.
sink.properties.*
No
None
String
The parameters that are used to control Stream Load behavior, such as 'sink.properties.columns' = 'k1, v1. From StarRocks v2.4 onwards, Primary Key model supports updating specified columns. For more parameters, see STREAM LOAD.
Note
If you set sink.semantic to at-least-once, flush to StarRocks will be triggered when sink.buffer-flush.max-bytes, sink.buffer-flush.max-rows, or sink.buffer-flush.interval-ms is met.
If you set sink.semantic to exactly-once, depending on Flink checkpointing, the data and its labels will be saved during each checkpoint. After the checkpointing is completed, more writes to the database are blocked, and all the data cached in state will be flushed to the sink in the first invocation to guarantee the exactly-once semantic. In this situation, sink.buffer-flush.* options are invalid.

Data type mapping (as a sink)

Flink Type
StarRocks Type
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INTEGER
INTEGER
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL
DECIMAL
BINARY
INT
CHAR
STRING
VARCHAR
STRING
STRING
STRING
DATE
DATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)
DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)
DATETIME
ARRAY<T>
ARRAY<T>
MAP<KT,VT>
JSON STRING
ROW<arg T...>
JSON STRING
Note: BYTES, VARBINARY, TIME, INTERVAL, MULTISET, and RAW in Flink are not supported. For details, see Data Types.

Notes

StarRocks data models

StarRocks provides four data models as shown in Data Models: Duplicate Key, Aggregate Key, Unique Key, and Primary Key. The Primary Key model supports the pushdown of predicates and indexes. As such, the Primary Key model can deliver high query performance despite real-time and frequent data updates. Therefore, if you have no special needs, the Primary Key model is recommended.
For upsert streams, the Primary Key model must be used. Otherwise, DELETE messages cannot be flushed to StarRocks.
Compared with the Unique Key model based on the Merge-On-Read policy, the Primary Key model improves the query performance by 3 to 10 times.
The Primary Key model can join multiple streams by performing update operations on individual columns.

As a source

WITH parameters

Option
Required
Default Value
Data Type
Description
connector
Yes
None
String
Here, it should be "starrocks".
scan-url
Yes
None
String
The HTTP URL of the FE in your StarRocks cluster in the format of fe_ip1:http_port,fe_ip2:http_port...., such as 172.28.28.98:8030.
jdbc-url
Yes
None
String
The address that is used to connect the MySQL client in the format of jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port..., such as jdbc:mysql://172.28.28.98:9030.
username
Yes
None
String
The username of your StarRocks cluster account.
password
Yes
None
String
The password of your StarRocks cluster account.
database-name
Yes
None
String
The name of the StarRocks database to which the StarRocks table you want to read belongs.
table-name
Yes
None
String
The name of the StarRocks table you want to read.
scan.connect.timeout-ms
No
1000
String
The maximum amount of time (in milliseconds) after which the connection from the Flink connector to your StarRocks cluster times out.
scan.params.keep-alive-min
No
10
String
The maximum amount of time (in minutes) during which the read task keeps alive.
scan.params.query-timeout-s
No
600(5min)
String
The maximum amount of time (in seconds) after which the read task times out.
scan.params.mem-limit-byte
No
102410241024 (1G)
String
The maximum amount of memory allowed per query on each BE.
scan.max-retries
No
1
String
The maximum number of times that the read task can be retried upon failures.
lookup.cache.ttl-ms
No
5000
Long
‌The maximum amount of time (in milliseconds) after which the query cache of the dimension table times out.

As a batch source

CREATE TABLE `starrocks` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks' ,
'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
'scan-url' = '172.28.28.98:8030', -- http_port
'database-name' = 'oceanus',
'table-name' = 'pk_user_behavior',
'username' = 'root',
'password' = 'xxx'
);

CREATE TABLE `print_sink` (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'logger'
);

INSERT INTO `print_sink`
SELECT * FROM starrocks;

As a dimension table

CREATE TABLE `starrocks` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks' ,
'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
'scan-url' = '172.28.28.98:8030', -- http_port
'database-name' = 'oceanus',
'table-name' = 'pk_user_behavior',
'username' = 'root',
'password' = 'xxx'
);

CREATE TABLE `datagen` (
`user_id` BIGINT,
`ts` as PROCTIME(),
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '20'
);

CREATE TABLE `print_sink` (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'logger'
);

INSERT INTO `print_sink`
SELECT a.user_id,b.item_id,b.behavior,a.ts
FROM `datagen` a LEFT JOIN `starrocks` FOR SYSTEM_TIME AS OF a.ts as b
ON a.user_id = b.user_id;

Data type mapping (as a source)

StarRocks
Flink
NULL
NULL
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
LARGEINT
STRING
FLOAT
FLOAT
DOUBLE
DOUBLE
DATE
DATE
DATETIME
TIMESTAMP
DECIMAL
DECIMAL
DECIMALV2
DECIMAL
DECIMAL32
DECIMAL
DECIMAL64
DECIMAL
DECIMAL128
DECIMAL
CHAR
CHAR
VARCHAR
STRING




Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon