Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported |
1.14 | Supported |
1.16 | Supported |
CREATE TABLE kafka_upsert_sink_table (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (-- Define Upsert Kafka parameters.'connector' = 'upsert-kafka', -- Specify the connector.'topic' = 'topic', -- Replace this with the topic you want to consume data from.'properties.bootstrap.servers' = '...', -- Replace this with your Kafka connection address.'key.format' = 'json', -- Define the data format of keys.'value.format' = 'json' -- Define the data format of values.);
Option | Required | Default Value | Data Type | Description |
connector | Yes | - | String | The connector to use. For Upsert Kafka, use 'upsert-kafka' . |
topic | Yes | - | String | The name of the topic to read from and write to. |
properties.bootstrap.servers | Yes | - | String | A list of Kafka brokers, separated with commas. |
properties.* | No | - | String | Arbitrary Kafka configurations. The suffixes must match the configuration key defined in the Kafka configuration document. Flink will remove the "properties." key prefix and pass the transformed keys and values to the underlying KafkaClient. For example, you can use 'properties.allow.auto.create.topics' = 'false'
to disable automatic topic creation. However, some configurations, such as 'key.deserializer' and 'value.deserializer' , cannot be set using this option because Flink will override them. |
key.format | Yes | - | String | The format used to deserialize and serialize the key part of Kafka messages. The key field is defined by PRIMARY KEY. Valid values include 'csv' , 'json' , and 'avro' . |
key.fields-prefix | No | - | String | A custom prefix for all fields of the key format to avoid name conflicts with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and key.fields will work with prefixed names. When the data type of the key format is constructed, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that value.fields-include be set to EXCEPT_KEY . |
value.format | Yes | - | String | The format used to deserialize and serialize the value part of Kafka messages. Valid values include 'csv' , 'json' , and 'avro' . |
value.fields-include | No | 'ALL' | String | A strategy specifying how to deal with key columns in the data type of the value format. Valid values: ALL : All physical columns of the table schema will be included in the value format, including columns defined as primary keys.EXCEPT_KEY : All physical columns of the table schema will be included in the value format except columns defined as primary keys. |
sink.parallelism | No | - | Integer | The parallelism of the Upsert Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
sink.buffer-flush.max-rows | No | 0 | Integer | The maximum number of buffered records before flush. When the sink receives many updates on the same key, the buffer will retain the last record of the same key. This helps reduce data shuffling and avoid possible tombstone messages to Kafka topics. To disable buffer flushing, set this parameter to 0 (default value). Note both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be set to greater than zero to enable sink buffer flushing. |
sink.buffer-flush.interval | No | 0 | Duration | The interval at which asynchronous threads flush data. When the sink receives many updates on the same key, the buffer will retain the last record of the same key. This helps reduce data shuffling and avoid possible tombstone messages to Kafka topics. To disable buffer flushing, set this parameter to 0 (default value). Note both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be set to greater than zero to enable sink buffer flushing. |
CREATE TABLE `kafka_json_source_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.-- Define the data format (JSON).'format' = 'json','json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);CREATE TABLE kafka_upsert_sink_table (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (-- Define Upsert Kafka parameters.'connector' = 'upsert-kafka', -- Specify the connector.'topic' = 'topic', -- Replace this with the topic you want to consume data from.'properties.bootstrap.servers' = '...', -- Replace this with your Kafka connection address.'key.format' = 'json', -- Define the data format of keys.'value.format' = 'json' -- Define the data format of values.);-- Calculate 'pv' and 'uv' and insert them to 'upsert-kafka sink'.INSERT INTO kafka_upsert_sink_tableSELECT * FROM kafka_json_source_table;
CREATE TABLE `YourTable` (...) WITH (...'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN',...);
username
should be instance ID
+ #
+ the username you set, and password
is the password you configured.krb5.conf
and emr.keytab
in the following paths./etc/krb5.conf/var/krb5kdc/emr.keytab
jar cvf kafka-xxx.jar krb5.conf emr.keytab
vim kafka-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.conf
klist -kt /var/krb5kdc/emr.keytab# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
CREATE TABLE `YourTable` (...) WITH (...'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'GSSAPI','properties.sasl.kerberos.service.name' = 'hadoop',...);
properties.sasl.kerberos.service.name
must match the principal you use. For example, if you use hadoop/${IP}@EMR-OQPO48B9
, the parameter value should be hadoop
.security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FDsecurity.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.confsecurity.kerberos.login.contexts: KafkaClientfs.hdfs.hadoop.security.authentication: kerberos
Was this page helpful?