Flink Version | Description |
1.11 | Supported (use as sink) |
1.13 | Supported (use as source and sink) |
1.14 | Supported (use as source and sink) |
1.16 | Supported (use as source and sink) |
CREATE TABLE clickhouse_sink_table (`id` INT,`name` STRING) WITH (-- Specify the parameters for database connection.'connector' = 'clickhouse', -- Specify the connector to use. Here, it should be `clickHouse`.'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.-- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.--'username' = 'root', -- The ClickHouse cluster username.--'password' = 'root', -- The ClickHouse cluster password.'database-name' = 'db', -- The database to write data to.'table-name' = 'table', -- The table to write data to.'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.);
CREATE TABLE clickhouse_upsert_sink_table (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.) WITH (-- Specify the parameters for database connection.'connector' = 'clickhouse', -- Specify the connector to use. Here, it should be `clickHouse`.'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.-- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.--'username' = 'root', -- The ClickHouse cluster username.--'password' = 'root', -- The ClickHouse cluster password.'database-name' = 'db', -- The database to write data to.'table-name' = 'table', -- The table to write data to.'table.collapsing.field' = 'Sign', -- The name of the CollapsingMergeTree type column field.'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.);
table.collapsing.field
field to support upsert. For the ClickHouse table creation statement, see FAQs.CREATE TABLE `clickhouse_batch_source` (`when` TIMESTAMP,`userid` BIGINT,`bytes` FLOAT) WITH ('connector' = 'clickhouse','url' = 'clickhouse://172.28.1.21:8123','database-name' = 'dts','table-name' = 'download_dist'-- 'scan.by-part.enabled' = 'false', -- Whether to read the ClickHouse table by part. To enable this, you must first stop the backend merges and TTL-based data deletions of the table using the commands 'STOP MERGES' and 'STOP TTL MERGES' on all nodes; otherwise, an error will occur in data reading.-- 'scan.part.modification-time.lower-bound' = '2021-09-24 16:00:00', -- The minimum `modification_time` (inclusive) for filtering ClickHouse table parts, in the format of `yyyy-MM-dd HH:mm:ss`.-- 'scan.part.modification-time.upper-bound' = '2021-09-17 19:16:26', -- The maximum `modification_time` (exclusive) for filtering ClickHouse table parts, in the format of `yyyy-MM-dd HH:mm:ss`.-- 'local.read-write' = 'false', -- Whether to read the local table. Default value: false.-- 'table.local-nodes' = '172.28.1.24:8123,172.28.1.226:8123,172.28.1.109:8123,172.28.1.36:8123' -- The list of local nodes (the HTTP port number is required). Note that only one replica node address can be configured for each shard; otherwise, duplicate data may be read.);
CREATE TABLE `clickhouse_dimension` (`userid` BIGINT,`comment` STRING) WITH ('connector' = 'clickhouse','url' = 'clickhouse://172.28.1.21:8123','database-name' = 'dimension','table-name' = 'download_dist','lookup.cache.max-rows' = '500', -- The maximum number of data records allowed in the Lookup Cache.'lookup.cache.ttl' = '10min', -- The maximum cache time of each record.'lookup.max-retries' = '10' -- The maximum number of retries upon database query failure.);
Option | Required | Default Value | Description |
connector | Yes | - | Required if ClickHouse is to be used as the data sink. Here, it should be clickhouse . |
url | Yes | - | The ClickHouse cluster connection URL, such as 'clickhouse://127.1.1.1:8123', which can be viewed on the cluster page. |
name-server | No | - | The ClickHouse cluster username. |
password | No | - | The ClickHouse cluster password. |
database-name | Yes | - | The ClickHouse cluster database. |
table-name | Yes | - | The ClickHouse cluster table. |
sink.batch-size | No | 1000 | The number of records to flush to the connector. |
sink.flush-interval | No | 1000 (unit: ms) | The interval for async threads to flush data to the ClickHouse connector. |
table.collapsing.field | No | - | The name of the CollapsingMergeTree type column field. |
sink.max-retries | No | 3 | The number of retries upon write failure. |
local.read-write | No | false | Whether to enable the feature of writing to the local table. The default value is false .Note: This feature is available only to advanced users and must be used together with the parameters described in the "Writing to local table" section of this document. |
table.local-nodes | No | - | The list of local nodes when local.read-write is set to true . Example: '127.1.1.10:8123,127.1.2.13:8123’ (the HTTP port number is required) |
sink.partition-strategy | No | balanced | The partition strategy when local.read-write is set to true . If you want to achieve dynamic updates and the table engine is CollapsingMergeTree, the value must be hash , and sink.partition-key must be set.Valid values: balanced (round-robin), shuffle (random), and hash (by sink.partition-key ). |
sink.partition-key | No | - | Required when local.read-write is set to true and sink.partition-strategy to hash . The value is the primary key defined in the table. If the primary key consists of multiple fields, you need to specify the value as the first field. |
sink.ignore-delete | No | false | Whether to ignore all DELETE messages that are written to ClickHouse. This option is suitable for scenarios where the ReplacingMergeTree table engine is used and dynamic data updates are expected. |
sink.backpressure-aware | No | false | If the "Too many parts" error message frequently appears in Flink logs, and this causes job crashes, you can enable this option to significantly reduce the server load and improve overall throughput and stability. |
sink.reduce-batch-by-key | No | false | Whether to merge data with the same key by retaining only the last record for ClickHouse sink tables that have defined primary keys, within the given flush interval. |
sink.max-partitions-per-insert | No | 20 | When ClickHouse is a partitioned table and the partitioning function in ClickHouse is intHash32, toYYYYMM, or toYYYYMMDD, Flink writes to ClickHouse by buffering data on the sink side based on partitions. When the number of accumulated partitions reaches the configured value, it triggers the write to downstream ClickHouse (if sink.flush-interval or sink.batch-size is reached first, they will trigger the write first). This greatly improves the throughput efficiency of writing to ClickHouse. Setting this option to -1 will disable the aggregation writing feature for a partitioned table. |
scan.fetch-size | No | 100 | The number of rows obtained in batches each time the database is read. |
scan.by-part.enabled | No | false | Whether to read the ClickHouse table by part. If this is enabled, you must first stop the backend merges and TTL-based data deletions of the table using the commands 'STOP MERGES' and 'STOP TTL MERGES' on all nodes; otherwise, an error will occur in data reading. |
scan.part.modification-time.lower-bound | No | - | The minimum modification_time (inclusive) for filtering ClickHouse table parts, in the format of yyyy-MM-dd HH:mm:ss . |
scan.part.modification-time.upper-bound | No | - | The maximum modification_time (exclusive) for filtering ClickHouse table parts, in the format of yyyy-MM-dd HH:mm:ss . |
lookup.cache.max-rows | No | - | The maximum number of data records cached in the Lookup Cache. |
lookup.cache.ttl | No | - | The maximum cache time of each data record in the Lookup Cache. |
lookup.max-retries | No | 3 | The maximum number of retries upon database lookup failure. |
WITH
parameters, you usually only need to specify the required ones. When you enable optional parameters, be sure to understand their meanings and how they may affect data writing.basic
. So it can only parse time in the format of YYYY-MM-DD HH:MM:SS
or YYYY-MM-DD
. If your job encounters time parsing exceptions (e.g., java.sql.SQLException: Code: 6. DB::Exception: Cannot parse string '2023-05-24 14:34:55.166' as DateTime
), refer to the table below to change the corresponding data type in Flink to TIMESTAMP(0)
, or change the value of date_time_input_format of the ClickHouse cluster to best_effort
. In addition, ClickHouse supports inserting DateTime data in integer format, so you can also map the type as INTEGER in Flink, but this is not recommended.2023-01-01 11:11:11.000
). To avoid issues, it is recommended to map it as the TIMESTAMP data type according to the table below.ClickHouse Type | Flink Type | Java Type |
String | VARCHAR/STRING | String |
FixedString(N) | VARCHAR/STRING | String |
Bool | BOOLEAN | Byte |
Int8 | TINYINT | Byte |
UInt8 | SMALLINT | Short |
Int16 | SMALLINT | Short |
UInt16 | INTEGER | Integer |
Int32 | INTEGER | Integer |
UInt32 | BIGINT | Long |
Int64 | BIGINT | Long |
UInt64 | BIGINT | Long |
Int128 | DECIMAL | BigInteger |
UInt128 | DECIMAL | BigInteger |
Int256 | DECIMAL | BigInteger |
UInt256 | DECIMAL | BigInteger |
Float32 | FLOAT | Float |
Float64 | DOUBLE | Double |
Decimal(P,S)/Decimal32(S)/Decimal64(S)/Decimal128(S)/Decimal256(S) | DECIMAL | BigDecimal |
Date | DATE | LocalDateTime |
DateTime([timezone]) | TIMESTAMP(0) | LocalDateTime |
DateTime64(precision, [timezone]) | TIMESTAMP(precision) | LocalDateTime |
Array(T) | ARRAY<T> | T[] |
Map(K, V) | MAP<K, V> | Map<?, ?> |
Tuple(T1, T2, ...) | ROW<f1 T1, f2 T2, ...> | List<Object> |
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- The number of data records generated per second.);CREATE TABLE clickhouse_sink_table (`id` INT,`name` STRING) WITH (-- Specify the parameters for database connection.'connector' = 'clickhouse', -- Specify the use of the ClickHouse connector.'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.-- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.--'username' = 'root', -- The ClickHouse cluster username.--'password' = 'root', -- The ClickHouse cluster password.'database-name' = 'db', -- The database to write data to.'table-name' = 'table', -- The table to write data to.'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.);insert into clickhouse_sink_table select * from datagen_source_table;
sink.ignore-delete
to true
so that Flink will automatically ignore DELETE messages and convert INSERT and UPDATE_AFTER messages into INSERT messages. Then, ClickHouse will automatically use the latest record to overwrite the previous record with the same primary key, achieving data updates.Sign
by using the table.collapsing.field
option. This engine works by sending messages with the same content but opposite Sign
values to achieve the deletion (cancellation) of old data and the insertion of new data.replicated_deduplication_window=0
when creating or modifying a table to disable the automatic deduplication feature.CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;
sharding_key
in the statement ENGINE = Distributed(cluster_name, database_name, table_name[, sharding_key]);
to the primary key of the sink table in Flink SQL. This ensures that records with the same primary key are written to the same node.local.read-write
option is set to true
, Flink can directly write to local tables.Sign
field using the table.collapsing.field
option and set sink.partition-strategy
to hash
to ensure that data with the same primary key is distributed to the same shard. In addition, set sink.partition-key
to the primary key field (for composite primary keys, set it to the first field).Nullable
. Otherwise, an error may occur in data writing.CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id ;
sink.max-partitions-per-insert
described above for detailed configuration options.-- Creating a databaseCREATE DATABASE test ON cluster default_cluster;-- Creating a local tableCREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;-- Creating a distributed table based on a local tableCREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);
-- Creating a databaseCREATE DATABASE test ON cluster default_cluster;-- Creating a local tableCREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64) ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}') ORDER BY id SETTINGS replicated_deduplication_window = 0;-- Creating a distributed table based on a local tableCREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);
Was this page helpful?