tencent cloud

文档反馈

数据库 StarRocks

最后更新时间:2023-11-08 14:24:42

    介绍

    flink-connector-starrocks 基于开源的 starrocks-connector-for-apache-flink v1.2.4 实现,支持通过 Flink 读写 StarRocks。

    版本说明

    Flink 版本
    说明
    1.11
    不支持
    1.13
    支持(批数据源、维表、目的表)
    1.14
    支持(批数据源、维表、目的表)
    1.16
    不支持

    作为数据目的(Sink)

    flink-connector-starrocks 作为数据目的,用于导入数据至 StarRocks,相比于 Flink 官方提供的 flink-connector-jdbc,导入性能更佳,flink-connector-starrocks 的内部实现是通过缓存并批量由 Stream Load 导入。支持 cvs、json 两种数据格式。
    以下为 MySQL-CDC 数据实时导入 StarRocks 的示例。
    CREATE TABLE `mysql_cdc` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
    'hostname' = '9.134.34.15', -- 数据库的 IP
    'port' = '3306', -- 数据库的访问端口
    'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
    'password' = 'xxx', -- 数据库访问的密码
    'database-name' = 'test', -- 需要同步的数据库
    'table-name' = 'user_behavior' -- 需要同步的数据表名
    );
    
    CREATE TABLE `pk_starrocks`(
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030',
    'load-url' = '172.28.28.98:8030',
    'database-name' = 'oceanus',
    'table-name' = 'pk_user_behavior',
    'username' = 'root',
    'password' = 'xxx',
    'sink.buffer-flush.interval-ms' = '15000',
    'sink.properties.format' = 'json',
    'sink.properties.strip_outer_array' = 'true', -- 需要设置为 true
    -- 'sink.parallelism' = '1',
    'sink.max-retries' = '3',
    'sink.semantic' = 'exactly-once'
    );
    
    INSERT INTO `pk_starrocks` SELECT * FROM `mysql_cdc`;
    说明:
    StarRocks 表须使用 主键模型,否则源表数据删除时,无法同步到 StarRocks。
    可用 StarRocks 的 SMT 工具,同步库表结构 到 StarRocks。

    WITH 参数

    参数
    必填
    默认值
    数据类型
    描述
    connector
    YES
    NONE
    String
    固定值 starrocks
    jdbc-url
    YES
    NONE
    String
    fe 节点 query_port: jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port... ,例如 jdbc:mysql://172.28.28.98:9030
    load-url
    YES
    NONE
    String
    fe 节点 http_port:  fe_ip1:http_port,fe_ip2:http_port.... ,例如 172.28.28.98:8030
    database-name
    YES
    NONE
    String
    starrocks 数据库名
    table-name
    YES
    NONE
    String
    starrocks 表名
    username
    YES
    NONE
    String
    starrocks 用户名
    password
    YES
    NONE
    String
    starrocks 用户密码
    sink.semantic
    NO
    at-least-once
    String
    可选值:
    at-least-once 
    exactly-once ,只在 checkpoint 时写数据。注意此时 sink.buffer-flush.* 相关参数无效
    sink.version
    NO
    AUTO
    String
    可选值:
    V1:使用 stream-load 实现 exactly-once 语义,性能比较低,适用所有的 starrocks 版本
    V2:使用 transaction-load 实现 exactly-once 语义,适用 starrocks 2.4 及之后的版本
    AUTO:根据 starrocks 是否支持 transaction-load 自动选择
    sink.buffer-flush.max-bytes
    NO
    94371840(90M)
    String
    批量写入参数,sink.semanticat-least-once 时有效。当 buffer 中数据大小超过设置值后,触发写入 StarRocks。取值范围[64MB, 10GB]
    sink.buffer-flush.max-rows
    NO
    500000
    String
    批量写入参数,sink.semanticat-least-once时有效。当 buffer 中数据条数超过设置值后,触发写入 StarRocks。取值范围[64,000, 5000,000]
    sink.buffer-flush.interval-ms
    NO
    300000
    String
    批量写入参数,sink.semanticat-least-once 时有效。间隔固定的周期(毫秒)触发写入 StarRocks。取值范围 [1000, 3600000]
    sink.max-retries
    NO
    3
    String
    写 StarRocks 的 stream load 请求的重试次数,取值范围 [0, 1000]
    sink.parallelism
    NO
    NULL
    String
    单独指定 sink 的并行度,不设置则使用全局并行度
    sink.connect.timeout-ms
    NO
    1000
    String
    连接 load-url 的超时时间(毫秒),取值范围[100, 60000]
    sink.label-prefix
    NO
    NO
    String
    stream load label 的前缀,合法字符[-_A-Za-z0-9]。关于 lable,参考 stream load 可选参数
    sink.properties.format
    NO
    CSV
    String
    导入 StarRocks 的数据格式,可选值 CSV 和 JSON,默认为 CSV
    sink.properties.column_separator
    NO
    \\t
    String
    用于指定 CSV 格式的列分隔符,参考 CSV 适用参数
    sink.properties.row_delimiter
    NO
    \\n
    String
    用于指定 CSV 格式的行分隔符,参考 CSV 适用参数
    sink.properties.strip_outer_array
    NO
    false
    String
    用于 JSON 格式,指定是否裁剪最外层的数组结构。取值范围:truefalse。默认值:false。Flink 批量导入 JSON 数据在最外层有一对表示数组结构的中括号 []。因此,需要您指定该参数取值为 true,这样 StarRocks 会剪裁掉外层的中括号 [],并把中括号 [] 里的每个内层数组都作为一行单独的数据导入。如果您指定该参数取值为 false,则 StarRocks 会把整个 JSON 数据文件解析成一个数组,并作为一行数据导入。例如,待导入的 JSON 数据为 [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ],如果指定该参数取值为 true,则 StarRocks 会把 {"category" : 1, "author" : 2}{"category" : 3, "author" : 4} 解析成两行数据,并导入到目标 StarRocks 表中对应的数据行。参考 JSON 适用参数
    sink.properties.*
    NO
    NONE
    String
    stream load 属性,例如 'sink.properties.columns' = 'k1, v1。自 StarRocks 2.4 开始,主键模型 支持部分列更新。更多参数请参考 stream load 文档
    说明:
    sink.semanticat-least-once时,sink.buffer-flush.max-bytes、sink.buffer-flush.max-rows、sink.buffer-flush.interval-ms 任意条件满足时触发 StarRocks 写操作。
    sink.semanticexactly-once时,依赖 flink 的 checkpoint,在每次 checkpoint 时保存批数据以及其 label,在checkpoint 完成后的第一次 invoke 中阻塞 flush 所有缓存在 state 当中的数据,以此达到精准一次。此时 sink.buffer-flush.* 相关参数无效。

    类型映射(Sink)

    Flink type
    StarRocks type
    BOOLEAN
    BOOLEAN
    TINYINT
    TINYINT
    SMALLINT
    SMALLINT
    INTEGER
    INTEGER
    BIGINT
    BIGINT
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    DECIMAL
    DECIMAL
    BINARY
    INT
    CHAR
    STRING
    VARCHAR
    STRING
    STRING
    STRING
    DATE
    DATE
    TIMESTAMP_WITHOUT_TIME_ZONE(N)
    DATETIME
    TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)
    DATETIME
    ARRAY<T>
    ARRAY<T>
    MAP<KT,VT>
    JSON STRING
    ROW<arg T...>
    JSON STRING
    注意:当前不支持 Flink 的 BYTES、VARBINARY、TIME、INTERVAL、MULTISET、RAW,具体可参考 Flink 数据类型

    注意事项

    StarRocks 数据模型

    StarRocks 支持四种 数据模型:明细模型、聚合模型、更新模型、主键模型。主键模型支持谓词和索引下推,能够在支持实时和频繁更新等场景的同时,提供高效查询。如果没有特殊需求,推荐使用主键模型。
    对于 Upsert 流,需要使用主键模型,否则 DELETE 消息无法写入 StarRocks。
    相对于 Merge-On-Read 策略的更新模型,主键模型的查询性能能够提升 3~10 倍。
    主键模型可利用部分列更新实现多流 JOIN。

    作为数据源(Source)

    WITH 参数

    参数
    必填
    默认值
    数据类型
    描述
    connector
    YES
    NONE
    String
    固定值 starrocks
    scan-url
    YES
    NONE
    String
    fe 节点 http_port: fe_ip1:http_port,fe_ip2:http_port....,例如 172.28.28.98:8030
    jdbc-url
    YES
    NONE
    String
    fe 节点 query_port: jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...,例如 jdbc:mysql://172.28.28.98:9030
    username
    YES
    NONE
    String
    StarRocks 用户名
    password
    YES
    NONE
    String
    StarRocks 用户密码
    database-name
    YES
    NONE
    String
    StarRocks 数据库名
    table-name
    YES
    NONE
    String
    StarRocks 表名
    scan.connect.timeout-ms
    NO
    1000
    String
    网络连接超时时间(毫秒)
    scan.params.keep-alive-min
    NO
    10
    String
    最大 keep alive 时间(分钟)
    scan.params.query-timeout-s
    NO
    600(5min)
    String
    单次查询超时时间(秒)
    scan.params.mem-limit-byte
    NO
    102410241024(1G)
    String
    单次查询内存限制
    scan.max-retries
    NO
    1
    String
    重试次数
    lookup.cache.ttl-ms
    NO
    5000
    Long
    维表查询的 cache 超时时间

    批数据源

    CREATE TABLE `starrocks` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks' ,
    'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
    'scan-url' = '172.28.28.98:8030', -- http_port
    'database-name' = 'oceanus',
    'table-name' = 'pk_user_behavior',
    'username' = 'root',
    'password' = 'xxx'
    );
    
    CREATE TABLE `print_sink` (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'logger'
    );
    
    INSERT INTO `print_sink`
    SELECT * FROM starrocks;

    维表

    CREATE TABLE `starrocks` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks' ,
    'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
    'scan-url' = '172.28.28.98:8030', -- http_port
    'database-name' = 'oceanus',
    'table-name' = 'pk_user_behavior',
    'username' = 'root',
    'password' = 'xxx'
    );
    
    CREATE TABLE `datagen` (
    `user_id` BIGINT,
    `ts` as PROCTIME(),
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '1',
    'fields.user_id.max' = '20'
    );
    
    CREATE TABLE `print_sink` (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'logger'
    );
    
    INSERT INTO `print_sink`
    SELECT a.user_id,b.item_id,b.behavior,a.ts
    FROM `datagen` a LEFT JOIN `starrocks` FOR SYSTEM_TIME AS OF a.ts as b
    ON a.user_id = b.user_id;

    类型映射(source)

    StarRocks
    Flink
    NULL
    NULL
    BOOLEAN
    BOOLEAN
    TINYINT
    TINYINT
    SMALLINT
    SMALLINT
    INT
    INT
    BIGINT
    BIGINT
    LARGEINT
    STRING
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    DATE
    DATE
    DATETIME
    TIMESTAMP
    DECIMAL
    DECIMAL
    DECIMALV2
    DECIMAL
    DECIMAL32
    DECIMAL
    DECIMAL64
    DECIMAL
    DECIMAL128
    DECIMAL
    CHAR
    CHAR
    VARCHAR
    STRING
    
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持