tencent cloud

All product documents
Stream Compute Service
Upsert Kafka
Last updated: 2023-11-08 11:20:39
Upsert Kafka
Last updated: 2023-11-08 11:20:39

Overview

The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion.
As a source, the Upsert Kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of the last value for the same key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT (INSERT/UPDATE) because any existing row with the same key is overwritten. Also, a record with a null value represents a DELETE.
As a sink, the Upsert Kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages, and write DELETE data as Kafka messages with null values (which indicates the messages will be deleted). Flink will guarantee the message ordering on the primary key by partitioning data based on the values of the primary key columns, so the update/delete messages on the same key will fall into the same partition.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Supported
1.16
Supported

Defining a table in DDL

CREATE TABLE kafka_upsert_sink_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- Define Upsert Kafka parameters.
'connector' = 'upsert-kafka', -- Specify the connector.
'topic' = 'topic', -- Replace this with the topic you want to consume data from.
'properties.bootstrap.servers' = '...', -- Replace this with your Kafka connection address.
'key.format' = 'json', -- Define the data format of keys.
'value.format' = 'json' -- Define the data format of values.
);
Note
Make sure you define the primary key in the DDL.

WITH parameters

Option
Required
Default Value
Data Type
Description
connector
Yes
-
String
The connector to use. For Upsert Kafka, use 'upsert-kafka'.
topic
Yes
-
String
The name of the topic to read from and write to.
properties.bootstrap.servers
Yes
-
String
A list of Kafka brokers, separated with commas.
properties.*
No
-
String
Arbitrary Kafka configurations. The suffixes must match the configuration key defined in the Kafka configuration document.
Flink will remove the "properties." key prefix and pass the transformed keys and values to the underlying KafkaClient. For example, you can use 'properties.allow.auto.create.topics' = 'false' to disable automatic topic creation. However, some configurations, such as 'key.deserializer' and 'value.deserializer', cannot be set using this option because Flink will override them.
key.format
Yes
-
String
The format used to deserialize and serialize the key part of Kafka messages. The key field is defined by PRIMARY KEY. Valid values include 'csv', 'json', and 'avro'.
key.fields-prefix
No
-
String
A custom prefix for all fields of the key format to avoid name conflicts with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and key.fields will work with prefixed names. When the data type of the key format is constructed, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that value.fields-include be set to EXCEPT_KEY.
value.format
Yes
-
String
The format used to deserialize and serialize the value part of Kafka messages. Valid values include 'csv', 'json', and 'avro'.
value.fields-include
No
'ALL'
String
A strategy specifying how to deal with key columns in the data type of the value format. Valid values:
ALL: All physical columns of the table schema will be included in the value format, including columns defined as primary keys.
EXCEPT_KEY: All physical columns of the table schema will be included in the value format except columns defined as primary keys.
sink.parallelism
No
-
Integer
The parallelism of the Upsert Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
sink.buffer-flush.max-rows
No
0
Integer
The maximum number of buffered records before flush. When the sink receives many updates on the same key, the buffer will retain the last record of the same key. This helps reduce data shuffling and avoid possible tombstone messages to Kafka topics. To disable buffer flushing, set this parameter to 0 (default value). Note both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be set to greater than zero to enable sink buffer flushing.
sink.buffer-flush.interval
No
0
Duration
The interval at which asynchronous threads flush data. When the sink receives many updates on the same key, the buffer will retain the last record of the same key. This helps reduce data shuffling and avoid possible tombstone messages to Kafka topics.
To disable buffer flushing, set this parameter to 0 (default value). Note both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be set to greater than zero to enable sink buffer flushing.

Example

CREATE TABLE `kafka_json_source_table` (
`id` INT,
`name` STRING
) WITH (
-- Define Kafka parameters.
'connector' = 'kafka',
'topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.
'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
'properties.group.id' = 'testGroup', -- (Required) The group ID.

-- Define the data format (JSON).
'format' = 'json',
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
);

CREATE TABLE kafka_upsert_sink_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- Define Upsert Kafka parameters.
'connector' = 'upsert-kafka', -- Specify the connector.
'topic' = 'topic', -- Replace this with the topic you want to consume data from.
'properties.bootstrap.servers' = '...', -- Replace this with your Kafka connection address.
'key.format' = 'json', -- Define the data format of keys.
'value.format' = 'json' -- Define the data format of values.
);

-- Calculate 'pv' and 'uv' and insert them to 'upsert-kafka sink'.
INSERT INTO kafka_upsert_sink_table
SELECT * FROM kafka_json_source_table;

SASL authentication

SASL/PLAIN username and password authentication

1. Follow the instructions in Message Queue CKafka - Configuring ACL Policy to configure username/password-based authentication (SASL_PLAINTEXT) for the topic.
2. Select "SASL_PLAINTEXT" as the access mode when adding a routing policy and access the topic via the address of that mode.
3. Configure WITH parameters for the job.
CREATE TABLE `YourTable` (
...
) WITH (
...
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
...
);
Note
username should be instance ID + # + the username you set, and password is the password you configured.

SASL/GSSAPI Kerberos authentication

Currently, Tencent Cloud CKafka does not support Kerberos authentication. If your self-built Kafka supports Kerberos authentication, you can follow the steps below to configure it.
1. Get the Kerberos configuration file for your self-built Kafka cluster. If your cluster is built on top of Tencent Cloud EMR, get the files krb5.conf and emr.keytab in the following paths.
/etc/krb5.conf
/var/krb5kdc/emr.keytab
2. Package the files into a JAR file.
jar cvf kafka-xxx.jar krb5.conf emr.keytab
3. Check the JAR structure (run the Vim command vim kafka-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
5. Get Kerberos principals to configure advanced job parameters.
klist -kt /var/krb5kdc/emr.keytab

# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. Configure WITH parameters for the job.
CREATE TABLE `YourTable` (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name' = 'hadoop',
...
);
Note
The value of properties.sasl.kerberos.service.name must match the principal you use. For example, if you use hadoop/${IP}@EMR-OQPO48B9, the parameter value should be hadoop.
security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FD
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
security.kerberos.login.contexts: KafkaClient
fs.hdfs.hadoop.security.authentication: kerberos

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon