tencent cloud

All product documents
Stream Compute Service
ClickHouse
Last updated: 2023-11-08 16:16:02
ClickHouse
Last updated: 2023-11-08 16:16:02

Overview

The ClickHouse sink connector supports data write to ClickHouse data warehouses. The ClickHouse source connector enables ClickHouse to be used as a batch source and a dimension table.

Versions

Flink Version
Description
1.11
Supported (use as sink)
1.13
Supported (use as source and sink)
1.14
Supported (use as source and sink)
1.16
Supported (use as source and sink)

‌## Limits

The ClickHouse connector does not support standard UPDATE and DELETE operations. For a ClickHouse sink connector, if you need to perform UPDATE and DELETE operations, see CollapsingMergeTree.
For JAR jobs written in Java/Scala, data can be written to ClickHouse using JDBC, which will not be elaborated on here.

Defining a table in DDL

As a sink with insert only

CREATE TABLE clickhouse_sink_table (
`id` INT,
`name` STRING
) WITH (
-- Specify the parameters for database connection.
'connector' = 'clickhouse', -- Specify the connector to use. Here, it should be `clickHouse`.
'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.
-- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.
--'username' = 'root', -- The ClickHouse cluster username.
--'password' = 'root', -- The ClickHouse cluster password.
'database-name' = 'db', -- The database to write data to.
'table-name' = 'table', -- The table to write data to.
'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.
);

As a sink with upsert

CREATE TABLE clickhouse_upsert_sink_table (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
) WITH (
-- Specify the parameters for database connection.
'connector' = 'clickhouse', -- Specify the connector to use. Here, it should be `clickHouse`.
'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.
-- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.
--'username' = 'root', -- The ClickHouse cluster username.
--'password' = 'root', -- The ClickHouse cluster password.
'database-name' = 'db', -- The database to write data to.
'table-name' = 'table', -- The table to write data to.
'table.collapsing.field' = 'Sign', -- The name of the CollapsingMergeTree type column field.
'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.
);
Note
You must define a primary key and declare the table.collapsing.field field to support upsert. For the ClickHouse table creation statement, see FAQs.

As a batch source

CREATE TABLE `clickhouse_batch_source` (
`when` TIMESTAMP,
`userid` BIGINT,
`bytes` FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://172.28.1.21:8123',
'database-name' = 'dts',
'table-name' = 'download_dist'
-- 'scan.by-part.enabled' = 'false', -- Whether to read the ClickHouse table by part. To enable this, you must first stop the backend merges and TTL-based data deletions of the table using the commands 'STOP MERGES' and 'STOP TTL MERGES' on all nodes; otherwise, an error will occur in data reading.
-- 'scan.part.modification-time.lower-bound' = '2021-09-24 16:00:00', -- The minimum `modification_time` (inclusive) for filtering ClickHouse table parts, in the format of `yyyy-MM-dd HH:mm:ss`.
-- 'scan.part.modification-time.upper-bound' = '2021-09-17 19:16:26', -- The maximum `modification_time` (exclusive) for filtering ClickHouse table parts, in the format of `yyyy-MM-dd HH:mm:ss`.
-- 'local.read-write' = 'false', -- Whether to read the local table. Default value: false.
-- 'table.local-nodes' = '172.28.1.24:8123,172.28.1.226:8123,172.28.1.109:8123,172.28.1.36:8123' -- The list of local nodes (the HTTP port number is required). Note that only one replica node address can be configured for each shard; otherwise, duplicate data may be read.
);
Note
Only MergeTree engines support reading by part, and you must stop backend merge and TTL-based data deletions on all nodes with the table being read to avoid inaccurate data reading caused by part changes. Only one replica node address can be configured for each shard for local reading; otherwise, duplicate data may be read.

As a dimension table

CREATE TABLE `clickhouse_dimension` (
`userid` BIGINT,
`comment` STRING
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://172.28.1.21:8123',
'database-name' = 'dimension',
'table-name' = 'download_dist',
'lookup.cache.max-rows' = '500', -- The maximum number of data records allowed in the Lookup Cache.
'lookup.cache.ttl' = '10min', -- The maximum cache time of each record.
'lookup.max-retries' = '10' -- The maximum number of retries upon database query failure.
);

WITH parameters

Option
Required
Default Value
Description
connector
Yes
-
Required if ClickHouse is to be used as the data sink. Here, it should be clickhouse.
url
Yes
-
The ClickHouse cluster connection URL, such as 'clickhouse://127.1.1.1:8123', which can be viewed on the cluster page.
name-server
No
-
The ClickHouse cluster username.
password
No
-
The ClickHouse cluster password.
database-name
Yes
-
The ClickHouse cluster database.
table-name
Yes
-
The ClickHouse cluster table.
sink.batch-size
No
1000
The number of records to flush to the connector.
sink.flush-interval
No
1000 (unit: ms)
The interval for async threads to flush data to the ClickHouse connector.
table.collapsing.field
No
-
The name of the CollapsingMergeTree type column field.
sink.max-retries
No
3
The number of retries upon write failure.
local.read-write
No
false
Whether to enable the feature of writing to the local table. The default value is false.
Note: This feature is available only to advanced users and must be used together with the parameters described in the "Writing to local table" section of this document.
table.local-nodes
No
-
The list of local nodes when local.read-write is set to true. Example: '127.1.1.10:8123,127.1.2.13:8123’ (the HTTP port number is required)
sink.partition-strategy
No
balanced
The partition strategy when local.read-write is set to true. If you want to achieve dynamic updates and the table engine is CollapsingMergeTree, the value must be hash, and sink.partition-key must be set.
Valid values: balanced (round-robin), shuffle (random), and hash (by sink.partition-key).
sink.partition-key
No
-
Required when local.read-write is set to true and sink.partition-strategy to hash. The value is the primary key defined in the table. If the primary key consists of multiple fields, you need to specify the value as the first field.
sink.ignore-delete
No
false
Whether to ignore all DELETE messages that are written to ClickHouse. This option is suitable for scenarios where the ReplacingMergeTree table engine is used and dynamic data updates are expected.
sink.backpressure-aware
No
false
If the "Too many parts" error message frequently appears in Flink logs, and this causes job crashes, you can enable this option to significantly reduce the server load and improve overall throughput and stability.
sink.reduce-batch-by-key
No
false
Whether to merge data with the same key by retaining only the last record for ClickHouse sink tables that have defined primary keys, within the given flush interval.
sink.max-partitions-per-insert
No
20
When ClickHouse is a partitioned table and the partitioning function in ClickHouse is intHash32, toYYYYMM, or toYYYYMMDD, Flink writes to ClickHouse by buffering data on the sink side based on partitions. When the number of accumulated partitions reaches the configured value, it triggers the write to downstream ClickHouse (if sink.flush-interval or sink.batch-size is reached first, they will trigger the write first). This greatly improves the throughput efficiency of writing to ClickHouse. Setting this option to -1 will disable the aggregation writing feature for a partitioned table.
scan.fetch-size
No
100
The number of rows obtained in batches each time the database is read.
scan.by-part.enabled
No
false
Whether to read the ClickHouse table by part. If this is enabled, you must first stop the backend merges and TTL-based data deletions of the table using the commands 'STOP MERGES' and 'STOP TTL MERGES' on all nodes; otherwise, an error will occur in data reading.
scan.part.modification-time.lower-bound
No
-
The minimum modification_time (inclusive) for filtering ClickHouse table parts, in the format of yyyy-MM-dd HH:mm:ss.
scan.part.modification-time.upper-bound
No
-
The maximum modification_time (exclusive) for filtering ClickHouse table parts, in the format of yyyy-MM-dd HH:mm:ss.
lookup.cache.max-rows
No
-
The maximum number of data records cached in the Lookup Cache.
lookup.cache.ttl
No
-
The maximum cache time of each data record in the Lookup Cache.
lookup.max-retries
No
3
The maximum number of retries upon database lookup failure.
Note
When defining WITH parameters, you usually only need to specify the required ones. When you enable optional parameters, be sure to understand their meanings and how they may affect data writing.

Data type mapping

For the data types supported by ClickHouse, see ClickHouse Data ‍Types. The table below lists some of the common data types and their counterparts in Flink. We recommend you map Flink data types to ClickHouse data types according to the table below to avoid unexpected results. Pay special attention to the following:
DateTime: ClickHouse supports a time precision of 1 second and its default configuration of date_time_input_format is basic. So it can only parse time in the format of YYYY-MM-DD HH:MM:SS or YYYY-MM-DD. If your job encounters time parsing exceptions (e.g., java.sql.SQLException: Code: 6. DB::Exception: Cannot parse string '2023-05-24 14:34:55.166' as DateTime), refer to the table below to change the corresponding data type in Flink to TIMESTAMP(0), or change the value of date_time_input_format of the ClickHouse cluster to best_effort. In addition, ClickHouse supports inserting DateTime data in integer format, so you can also map the type as INTEGER in Flink, but this is not recommended.
DateTime64: ClickHouse supports inserting DateTime64 data in integer and DECIMAL formats, so you may choose to map it as BIGINT or DECIMAL in Flink. If you map it as BIGINT in Flink, ensure that the BIGINT value to write matches the precision of DateTime64. For example, the default precision for DateTime64 is milliseconds, that is, DateTime64(3) during table creation in ClickHouse, so the BIGINT value to write should also be in milliseconds, for example, 1672542671000 (2023-01-01 11:11:11.000). To avoid issues, it is recommended to map it as the TIMESTAMP data type according to the table below.
ClickHouse Type
Flink Type
Java Type
String
VARCHAR/STRING
String
FixedString(N)
VARCHAR/STRING
String
Bool
BOOLEAN
Byte
Int8
TINYINT
Byte
UInt8
SMALLINT
Short
Int16
SMALLINT
Short
UInt16
INTEGER
Integer
Int32
INTEGER
Integer
UInt32
BIGINT
Long
Int64
BIGINT
Long
UInt64
BIGINT
Long
Int128
DECIMAL
BigInteger
UInt128
DECIMAL
BigInteger
Int256
DECIMAL
BigInteger
UInt256
DECIMAL
BigInteger
Float32
FLOAT
Float
Float64
DOUBLE
Double
Decimal(P,S)/Decimal32(S)/Decimal64(S)/Decimal128(S)/Decimal256(S)
DECIMAL
BigDecimal
Date
DATE
LocalDateTime
DateTime([timezone])
TIMESTAMP(0)
LocalDateTime
DateTime64(precision, [timezone])
TIMESTAMP(precision)
LocalDateTime
Array(T)
ARRAY<T>
T[]
Map(K, V)
MAP<K, V>
Map<?, ?>
Tuple(T1, T2, ...)
ROW<f1 T1, f2 T2, ...>
List<Object>

Example

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- The number of data records generated per second.
);
CREATE TABLE clickhouse_sink_table (
`id` INT,
`name` STRING
) WITH (
-- Specify the parameters for database connection.
'connector' = 'clickhouse', -- Specify the use of the ClickHouse connector.
'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.
-- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.
--'username' = 'root', -- The ClickHouse cluster username.
--'password' = 'root', -- The ClickHouse cluster password.
'database-name' = 'db', -- The database to write data to.
'table-name' = 'table', -- The table to write data to.
'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.
);
insert into clickhouse_sink_table select * from datagen_source_table;

FAQs

Data upsert and delete

ClickHouse does not support upsert. So for scenarios that require dynamic data updates and deletions, ReplacingMergeTree or CollapsingMergeTree is usually used to simulate update or delete operations. These table engines are suitable for different scenarios.

ReplacingMergeTree table engine

If the ClickHouse table engine is ReplacingMergeTree, you can set sink.ignore-delete to true so that Flink will automatically ignore DELETE messages and convert INSERT and UPDATE_AFTER messages into INSERT messages. Then, ClickHouse will automatically use the latest record to overwrite the previous record with the same primary key, achieving data updates.
Note that this mode only supports data insert and update operations, but not delete operations. Therefore, if you have simple ETL scenarios like CDC that require precise synchronization, you can use the CollapsingMergeTree table engine described below to achieve better results.

CollapsingMergeTree table engine

If the ClickHouse table engine is CollapsingMergeTree, you can specify Sign by using the table.collapsing.field option. This engine works by sending messages with the same content but opposite Sign values to achieve the deletion (cancellation) of old data and the insertion of new data.
In a production environment, ReplicatedCollapsingMergeTree is commonly used. However, the automatic deduplication feature of ReplicatedMergeTree may result in multiple records written to ClickHouse within a short time being identified as duplicate data, leading to data loss. In such cases, you can specify replicated_deduplication_window=0 when creating or modifying a table to disable the automatic deduplication feature.
Example:
CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;
For more information about automatic deduplication, see Data Replication.

sharding_key of a ClickHouse distributed table

When creating a distributed table, set sharding_key in the statement ENGINE = Distributed(cluster_name, database_name, table_name[, sharding_key]); to the primary key of the sink table in Flink SQL. This ensures that records with the same primary key are written to the same node.
The parameters in the statement are described as below:
cluster_name: The cluster name, which corresponds to the custom name specified in the cluster configuration.
database_name: The database name.
table_name: The table name.
sharding_key: (Optional) The key used for sharding. During the data writing process, the distributed table will distribute data to local tables on different nodes based on the rules defined by the sharding key.

Writing to local tables (feature for advanced users)

If the local.read-write option is set to true, Flink can directly write to local tables.
Note
Enabling write to local tables can significantly improve the throughput. However, if the ClickHouse cluster undergoes scaling or node replacement later, it may result in uneven data distribution, write failure, or data update failure. Therefore, this feature is intended only for advanced users.
If a table has a primary key and uses UPDATE and DELETE semantics, we recommend you use the CollapsingMergeTree table engine when creating the table. You can specify the Sign field using the table.collapsing.field option and set sink.partition-strategy to hash to ensure that data with the same primary key is distributed to the same shard. In addition, set sink.partition-key to the primary key field (for composite primary keys, set it to the first field).

Null data

If certain fields in the data can be nullable, you need to set the field declaration in the ClickHouse table creation DDL statements to Nullable. Otherwise, an error may occur in data writing.
CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id ;

Optimization of writing to partitioned tables

When writing to a ClickHouse partitioned table, if the partition definition in ClickHouse uses functions supported by Stream Compute Service (intHash32, toYYYYMM, and toYYYYMMDD), Stream Compute Service enables the buffering of data by partition before writing by default. With this feature enabled, Stream Compute Service tries to include as few partitions as possible in each batch of data (when the throughput of business partition data is large enough, each batch contains only one partition), thereby improving ClickHouse's merge performance. If the data for a single partition has not reached the batch size, data of multiple partitions is merged into one batch and then written to ClickHouse. You can refer to sink.max-partitions-per-insert described above for detailed configuration options.

Monitoring metric description

Stream Compute Service adds many practical metrics in the ClickHouse connector. Click the ClickHouse sink operator in the execution graph in Flink UI a‌nd search one of the metrics:
numberOfInsertRecords: The number of output +I messages.
numberOfDeleteRecords: The number of output -D messages.
numberOfUpdateBeforeRecords: The number of output -U messages.
numberOfUpdateAfterRecords: The number of output +U messages.

Example: ClickHouse table creation statements

CollapsingMergeTree table creation statement supporting UPDATE and DELETE

-- Creating a database
CREATE DATABASE test ON cluster default_cluster;

-- Creating a local table
CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;

-- Creating a distributed table based on a local table
CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);

MergeTree table creation statement including only INSERT

-- Creating a database
CREATE DATABASE test ON cluster default_cluster;

-- Creating a local table
CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64) ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}') ORDER BY id SETTINGS replicated_deduplication_window = 0;

-- Creating a distributed table based on a local table
CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon