tencent cloud

All product documents
Stream Compute Service
Last updated: 2023-11-08 14:54:06
Redis
Last updated: 2023-11-08 14:54:06

Overview

The Redis connector supports data write to Redis and can be used as a dimension table.

Versions

Flink Version
Description
1.11
Supported (write to Redis and use as a dimension table)
1.13
Supported (write to Redis and use as a dimension table)
1.14
Supported (write to Redis and use as a dimension table)
1.16
Supported (write to Redis and use as a dimension table)

Use cases

As a dimension table.
As a sink for tuple and upsert streams.

Defining a table in DDL

SET command (string key)

-- The first column is "key" and the second column "value". The Redis command is "set key value".
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'set', -- The SET command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>'
);

LPUSH command (list key)

-- The first column is "key" and the second column "value". The Redis command is "Ipush key value".
CREATE TABLE `redis_lpush_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'lpush', -- The LPUSH command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>'
);


SADD command (set key)

-- The first column is "key" and the second column "value". The Redis command is "sadd key value".
CREATE TABLE `redis_sadd_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'sadd', -- The SADD command
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>'
);


HSET command (hash key)

-- The first column is "hash_key" and the second column "hash_value". The Redis command is "hset key hash_key hash_value".
CREATE TABLE `redis_hset_sink_table` (
`hash_key` STRING,
`hash_value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'hset', -- The HSET command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>',
'additional-key' = '<key>' -- The hash key.
);


HSET_WITH_KEY command (hash key)

-- The first column is "key", the second column "hash_key", and the third column "hash_value". The Redis command is "hset key hash_key hash_value".
CREATE TABLE `redis_hset_sink_table` (
`key` STRING,
`hash_key` STRING,
`hash_value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'hset', -- The HSET command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>'
);


HMSET command

-- The first column is "key", the second column "hash_key", and the third column "hash_value". The Redis command to insert data is "hmset key hash_key hash_value".
CREATE TABLE `redis_hmset_sink_table` (
`key` STRING,
`fieldKey` STRING,
`fieldValue` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'hmset', -- The HMSET command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>'
);

ZADD command (sorted set key)

-- The first column is "hash_key" and the second column "hash_value". The ‌Redis command is "zadd key value score".
CREATE TABLE `redis_zadd_sink_table` (
`value` STRING,
`score` DOUBLE
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'zadd', -- The ZADD command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>',
'additional-key' = '<key>' -- The sorted set key.
);


ZADD_WITH_KEY command (sorted set key)

-- The first column is "key", the second column "value", and the third column "score". The ‌Redis command is "zadd key value score".
CREATE TABLE `redis_zadd_sink_table` (
`key` STRING,
`value` STRING,
`score` DOUBLE
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'zadd', -- The ZADD command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'database' = '<database>',
);

GET command (dimension table)

CREATE TABLE `redis_dimension_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis',
'command' = 'get', -- The GET command.
'nodes' = '<host>:<port>', -- The Redis server connection address. For a cluster mode, separate nodes by comma.
'lookup.cache.max-rows' = '500', -- The maximum number of data records allowed in the Lookup Cache.
'lookup.cache.ttl' = '10 min', -- The maximum cache time of each record.
'lookup.max-retries' = '5', -- The maximum number of retries after query failure.
-- 'password' = '<password>', -- The password (optional).
-- 'database' = '<database>', -- The database (optional), which defaults to `0`.
-- 'redis-mode' = 'standalone' -- The Redis mode (optional), which defaults to `standalone`. (Valid values: `standalone` and `cluster`.)
);

WITH Parameters

Option
Required
Default Value
Description
connector
Yes
-
Here, it should be redis.
command
Yes
-
The operation command. The values and corresponding key types are as follows:
set: String key
lpush: List key
sadd: Set key
hset: Hash key
hset_with_key: Hash key
zadd: Sorted set key
zadd_with_key: Sorted set key
nodes
Yes
-
The Redis server connection address, such as 127.0.0.1:6379. For cluster architecture, separate nodes by comma.
password
No
Null
The Redis password, which defaults to null, meaning that no permission validation is required.
database
No
0
The DB number of the database to operate, which defaults to 0.
redis-mode
No
standalone
The Redis deployment mode.
standalone: Standard architecture
cluster: Distributed cluster architecture
ignore-delete
No
false
Whether to ignore retraction messages.
additional-ttl
No
-
The expiration time in seconds. For example, if it is set to 60, the expiration time is 60 seconds. Only the SET command allows setting the expiration time.
additional-key
-
-
Specifies the key in the HSET or ZADD command. It is required for running the HSET or ZADD command.
lookup.cache.max-rows
No
-
The maximum number of data records allowed in the Lookup Cache.
lookup.cache.ttl
No
10s
The maximum cache time of each record.
lookup.max-retries
No
1
The maximum number of retries after query failure.
lookup.cache.caching-missing-key
No
true
Whether to cache an empty result.

Example

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- The number of data records generated per second.
);

CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'set', -- The SET command.
'nodes' = '127.0.0.1:8121', -- The Redis server connection address.
'password' = '<password>',
'database' = '0'
);
insert into redis_set_sink_table select cast(id as string), name from datagen_source_table;

Notes

1. A self-built Redis cluster does not support multiple databases; in the cluster mode, the database option is invalid.
2. The cluster architecture of TencentDB for Redis supports multiple databases. You can use the standalone mode to specify databases (other than the database 0) whose data will be written to TencentDB for Redis. Example:
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- The connector (Redis) to use.
'command' = 'set', -- The SET command.
'nodes' = '<host>:<port>', -- The Redis server connection address.
'password' = '<password>',
'redis-mode' = 'standalone',
'database' = '1'
);

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support