tencent cloud

文档反馈

消息队列 Upsert Kafka

最后更新时间:2023-11-08 11:20:52

    介绍

    Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
    作为 Source,Upsert Kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
    作为 Sink,Upsert Kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

    版本说明

    Flink 版本
    说明
    1.11
    不支持
    1.13
    支持
    1.14
    支持
    1.16
    支持

    DDL 定义

    CREATE TABLE kafka_upsert_sink_table (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    -- 定义 Upsert Kafka 参数
    'connector' = 'upsert-kafka', -- 选择 connector
    'topic' = 'topic', -- 替换为您要消费的 Topic
    'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址
    'key.format' = 'json', -- 定义 key 数据格式
    'value.format' = 'json' -- 定义value 数据格式
    );
    说明
    Upsert Kafka 确保在 DDL 中定义主键。

    WITH 参数

    参数
    是否必选
    默认值
    数据类型
    描述
    connector
    必选
    (none)
    String
    指定要使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'
    topic
    必选
    (none)
    String
    用于读取和写入的 Kafka topic 名称。
    properties.bootstrap.servers
    必选
    (none)
    String
    以逗号分隔的 Kafka brokers 列表。
    properties.*
    可选
    (none)
    String
    该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档 中的参数名。
    Flink 会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,您可以通过 'properties.allow.auto.create.topics' = 'false 来禁止自动创建 topic。 但是,某些选项,例如 'key.deserializer''value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
    key.format
    必选
    (none)
    String
    用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv''json''avro'
    key.fields-prefix
    optional
    (none)
    String
    为'key.fields'的所有字段定义自定义前缀,以避免与 'value.fields' 字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,则表 schema 和 'key.fields' 将使用前缀名称。构建'key.fields'格式的数据类型时候,将删除前缀并使用 key format 中非前缀名称。请注意,此选项要求 'value.fields-include'必须设置为 'EXCEPT_KEY'
    value.format
    必选
    (none)
    String
    用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv''json''avro'
    value.fields-include
    可选
    'ALL'
    String
    控制哪些字段应该出现在 value 中。可取值:
    ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。
    EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
    sink.parallelism
    可选
    (none)
    Integer
    定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
    sink.buffer-flush.max-rows
    可选
    0
    Integer
    缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows''sink.buffer-flush.interval' 两个选项为大于零的值。
    sink.buffer-flush.interval
    可选
    0
    Duration
    缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。
    可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows''sink.buffer-flush.interval' 两个选项为大于零的值。

    代码示例

    CREATE TABLE `kafka_json_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- 替换为您要消费的 Topic
    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    
    -- 定义数据格式 (JSON 格式)
    'format' = 'json',
    'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
    'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
    );
    
    CREATE TABLE kafka_upsert_sink_table (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    -- 定义 Upsert Kafka 参数
    'connector' = 'upsert-kafka', -- 选择 connector
    'topic' = 'topic', -- 替换为您要消费的 Topic
    'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址
    'key.format' = 'json', -- 定义 key 数据格式
    'value.format' = 'json' -- 定义value 数据格式
    );
    
    -- 计算 pv、uv 并插入到 upsert-kafka sink
    INSERT INTO kafka_upsert_sink_table
    SELECT * FROM kafka_json_source_table;

    SASL 认证授权

    SASL/PLAIN 用户名密码认证授权

    1. 参考 消息队列 CKafka - 配置 ACL 策略,设置 Topic 按用户名密码访问的 SASL_PLAINTEXT 认证方式。
    2. 参考 消息队列 CKafka - 添加路由策略,选择 SASL_PLAINTEXT 接入方式,并以该接入方式下的网络地址访问 Topic。
    3. 作业配置 with 参数。
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    ...
    );
    说明
    username实例 ID + # + 刚配置的用户名password 是刚配置的用户密码。

    SASL/GSSAPI Kerberos 认证授权

    腾讯云 CKafka 暂时不支持 Kerberos 认证,您的自建 Kafka 如果开启了 Kerberos 认证,可参考如下步骤配置作业。
    1. 获取您的自建 Kafka 集群的 Kerberos 配置文件,如果您基于腾讯云 EMR 集群自建,获取 krb5.conf、emr.keytab 文件,路径如下。
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    2. 对步骤1中获取的文件打 jar 包。
    jar cvf kafka-xxx.jar krb5.conf emr.keytab
    3. 校验 jar 的结构(可以通过 vim 命令查看 vim kafka-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
    5. 获取 kerberos principal,用于作业 高级参数 配置。
    klist -kt /var/krb5kdc/emr.keytab
    
    # 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. 作业 with 参数配置。
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'GSSAPI',
    'properties.sasl.kerberos.service.name' = 'hadoop',
    ...
    );
    说明:
    参数 properties.sasl.kerberos.service.name 的值必须与您选取的 principal 匹配,如果您选择的为 hadoop/${IP}@EMR-OQPO48B9,那么取值为 hadoop。
    7. 作业 高级参数 配置。
    security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FD
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    security.kerberos.login.contexts: KafkaClient
    fs.hdfs.hadoop.security.authentication: kerberos
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持