tencent cloud

文档反馈

数据库 Redis

最后更新时间:2023-11-08 14:54:39

    介绍

    Redis Connector 提供了对 Redis 写入和维表的支持。

    版本说明

    Flink 版本
    说明
    1.11
    支持(写入,维表)
    1.13
    支持(写入,维表)
    1.14
    支持(写入、维表)
    1.16
    支持(写入、维表)

    使用范围

    可以作为维表的使用。
    可以作为 Tuple、Upsert 数据流的目的表。

    DDL 定义

    set 命令(字符串键)

    -- 第1列为 key,第2列为 value。Redis 命令为 set key value
    CREATE TABLE `redis_set_sink_table` (
    `key` STRING,
    `value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'set', -- set 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>'
    );

    lpush 命令(列表键)

    -- 第1列为 key,第2列为 value。Redis 命令为 lpush key value
    CREATE TABLE `redis_lpush_sink_table` (
    `key` STRING,
    `value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'lpush', -- lpush 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>'
    );
    

    sadd 命令(集合键)

    -- 第1列为 key,第2列为 value。Redis 命令为 sadd key value
    CREATE TABLE `redis_sadd_sink_table` (
    `key` STRING,
    `value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'sadd', -- sadd 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>'
    );
    

    hset 命令(哈希键)

    -- 第1列为 hash_key,第2列为 hash_value。Redis 命令为 hset key hash_key hash_value。
    CREATE TABLE `redis_hset_sink_table` (
    `hash_key` STRING,
    `hash_value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'hset', -- hset 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>',
    'additional-key' = '<key>' -- 哈希键
    );
    

    HSET_WITH_KEY 命令(哈希键)

    -- 第1列为 key, 第2列为 hash_key,第3列为 hash_value。Redis 命令为 hset key hash_key hash_value。
    CREATE TABLE `redis_hset_sink_table` (
    `key` STRING,
    `hash_key` STRING,
    `hash_value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'hset', -- hset 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>'
    );
    

    hmset 命令

    -- 第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为hmset key hash_key hash_value
    CREATE TABLE `redis_hmset_sink_table` (
    `key` STRING,
    `fieldKey` STRING,
    `fieldValue` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'hmset', -- hmset 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>'
    );

    zadd 命令(有序集合键)

    -- 第1列为 hash_key,第2列为 hash_value。zadd key score value。
    CREATE TABLE `redis_zadd_sink_table` (
    `value` STRING,
    `score` DOUBLE
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'zadd', -- zadd 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>',
    'additional-key' = '<key>' -- 有序集合键
    );
    

    ZADD_WITH_KEY 命令(有序集合键)

    -- 第1列为 key, 第2列为 value,第3列为 score。zadd key score value。
    CREATE TABLE `redis_zadd_sink_table` (
    `key` STRING,
    `value` STRING,
    `score` DOUBLE
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'zadd', -- zadd 命令
    'nodes' = '<host>:<port>', -- redis 连接地址
    'password' = '<password>',
    'database' = '<database>',
    );

    get 命令(维表)

    CREATE TABLE `redis_dimension_table` (
    `key` STRING,
    `value` STRING
    ) WITH (
    'connector' = 'redis',
    'command' = 'get', -- get命令
    'nodes' = '<host>:<port>', -- redis连接地址,集群模式多个节点使用'',''分隔。
    'lookup.cache.max-rows' = '500', -- 查询缓存中最多缓存的数据条数。
    'lookup.cache.ttl' = '10 min', -- 每条记录的最长缓存时间。
    'lookup.max-retries' = '5', -- 查询失败,最多重试次数。
    -- 'password' = '<password>', -- 可选参数,密码
    -- 'database' = '<database>', -- 可选参数,数据库:默认0
    -- 'redis-mode' = 'standalone' -- 可选参数,redis 部署模式,默认standalone。(可选:standalone 单机模式,cluster 集群模式)
    );

    WITH 参数

    参数值
    必填
    默认值
    描述
    connector
    -
    固定值为 redis
    command
    -
    操作命令。取值与对应的键类型如下:
    set:字符串键
    lpush:类别键
    sadd:集合键
    hset:哈希键
    hset_with_key:哈希键
    zadd:有序集合键
    zadd_with_key:有序集合键
    nodes
    -
    redis server 连接地址,示例:127.0.0.1:6379。集群架构下多个节点使用','分隔
    password
    redis 密码,默认值为空,不进行权限验证
    database
    0
    要操作的数据库的 DB number,默认值0
    redis-mode
    standalone
    redis 部署模式
    standalone:标准架构,单机
    cluster:集群架构,分布式
    ignore-delete
    false
    是否忽略 Retraction 消息
    additional-ttl
    -
    过期时间,单位:秒。示例:60,设置过期是时间为60秒。只有 set 命令支持设置过期时间
    additional-key
    -
    -
    用于指定 hset 和 zadd 的 key。执行 hset 和 zadd 命令时必须设置
    lookup.cache.max-rows
    -
    查询缓存中最多缓存的数据条数
    lookup.cache.ttl
    10s
    每条记录的最长缓存时间
    lookup.max-retries
    1
    查询失败最多重试次数
    lookup.cache.caching-missing-key
    true
    是否缓存空结果

    代码示例

    CREATE TABLE datagen_source_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1' -- 每秒产生的数据条数
    );
    
    CREATE TABLE `redis_set_sink_table` (
    `key` STRING,
    `value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到 Redis
    'command' = 'set', -- set 命令
    'nodes' = '127.0.0.1:8121', -- redis 连接地址
    'password' = '<password>',
    'database' = '0'
    );
    insert into redis_set_sink_table select cast(id as string), name from datagen_source_table;

    注意事项

    1. 自建 Redis 的集群模式不支持多数据库,集群模式下 database 参数无效。
    2. 腾讯云数据库 Redis 的集群架构支持多数据库,可以使用 standalone 模式指定写入云数据库 Redis 集群架构中的非0数据库,例如:
    CREATE TABLE `redis_set_sink_table` (
    `key` STRING,
    `value` STRING
    ) WITH (
    'connector' = 'redis', -- 输出到Redis
    'command' = 'set', -- set命令
    'nodes' = '<host>:<port>', -- redis连接地址
    'password' = '<password>',
    'redis-mode' = 'standalone',
    'database' = '1'
    );
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持