Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 |
1.14 | 支持 |
1.16 | 支持 |
CREATE TABLE kafka_upsert_sink_table (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (-- 定义 Upsert Kafka 参数'connector' = 'upsert-kafka', -- 选择 connector'topic' = 'topic', -- 替换为您要消费的 Topic'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址'key.format' = 'json', -- 定义 key 数据格式'value.format' = 'json' -- 定义value 数据格式);
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
connector | 必选 | (none) | String | 指定要使用的连接器,Upsert Kafka 连接器使用: 'upsert-kafka' 。 |
topic | 必选 | (none) | String | 用于读取和写入的 Kafka topic 名称。 |
properties.bootstrap.servers | 必选 | (none) | String | 以逗号分隔的 Kafka brokers 列表。 |
properties.* | 可选 | (none) | String | Flink 会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,您可以通过 'properties.allow.auto.create.topics' = 'false
来禁止自动创建 topic。 但是,某些选项,例如 'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。 |
key.format | 必选 | (none) | String | 用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv' 、'json' 、'avro' 。 |
key.fields-prefix | optional | (none) | String | 为'key.fields'的所有字段定义自定义前缀,以避免与 'value.fields' 字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,则表 schema 和 'key.fields' 将使用前缀名称。构建'key.fields'格式的数据类型时候,将删除前缀并使用 key format 中非前缀名称。请注意,此选项要求 'value.fields-include' 必须设置为 'EXCEPT_KEY' 。 |
value.format | 必选 | (none) | String | 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv' 、'json' 、'avro' 。 |
value.fields-include | 可选 | 'ALL' | String | 控制哪些字段应该出现在 value 中。可取值: ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。 EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 |
sink.parallelism | 可选 | (none) | Integer | 定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 |
sink.buffer-flush.max-rows | 可选 | 0 | Integer | 缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows' 和 'sink.buffer-flush.interval' 两个选项为大于零的值。 |
sink.buffer-flush.interval | 可选 | 0 | Duration | 缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows' 和 'sink.buffer-flush.interval' 两个选项为大于零的值。 |
CREATE TABLE `kafka_json_source_table` (`id` INT,`name` STRING) WITH (-- 定义 Kafka 参数'connector' = 'kafka','topic' = 'Data-Input', -- 替换为您要消费的 Topic'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID-- 定义数据格式 (JSON 格式)'format' = 'json','json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。);CREATE TABLE kafka_upsert_sink_table (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (-- 定义 Upsert Kafka 参数'connector' = 'upsert-kafka', -- 选择 connector'topic' = 'topic', -- 替换为您要消费的 Topic'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址'key.format' = 'json', -- 定义 key 数据格式'value.format' = 'json' -- 定义value 数据格式);-- 计算 pv、uv 并插入到 upsert-kafka sinkINSERT INTO kafka_upsert_sink_tableSELECT * FROM kafka_json_source_table;
CREATE TABLE `YourTable` (...) WITH (...'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN',...);
username
是实例 ID
+ #
+ 刚配置的用户名
,password
是刚配置的用户密码。/etc/krb5.conf/var/krb5kdc/emr.keytab
jar cvf kafka-xxx.jar krb5.conf emr.keytab
META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.conf
klist -kt /var/krb5kdc/emr.keytab# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
CREATE TABLE `YourTable` (...) WITH (...'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'GSSAPI','properties.sasl.kerberos.service.name' = 'hadoop',...);
properties.sasl.kerberos.service.name
的值必须与您选取的 principal 匹配,如果您选择的为 hadoop/${IP}@EMR-OQPO48B9
,那么取值为 hadoop。security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FDsecurity.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.confsecurity.kerberos.login.contexts: KafkaClientfs.hdfs.hadoop.security.authentication: kerberos
本页内容是否解决了您的问题?