tencent cloud

文档反馈

消息队列 Kafka

最后更新时间:2023-11-08 14:59:14

    介绍

    Kafka 数据管道是流计算系统中最常用的数据源(Source)和数据目的(Sink)。用户可以把流数据导入到 Kafka 的某个 Topic 中,通过 Flink 算子进行处理后,输出到相同或不同 Kafka 示例的另一个 Topic。
    Kafka 支持同一个 Topic 多分区读写,数据可以从多个分区读入,也可以写入到多个分区,以提供更高的吞吐量,减少数据倾斜和热点。

    版本说明

    Flink 版本
    说明
    1.11
    支持
    1.13
    支持
    1.14
    支持
    1.16
    支持

    使用范围

    Kafka 支持用作数据源表(Source),也可以作为 Tuple 数据流的目的表(Sink)。
    Kafka 还可以与 DebeziumCanal 等联用,对 MySQL、PostgreSQL 等传统数据库的变更进行捕获和订阅,然后 Flink 即可对这些变更事件进行进一步的处理。

    DDL 定义

    用作数据源(Source)

    JSON 格式输入

    CREATE TABLE `kafka_json_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- 替换为您要消费的 Topic
    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    
    -- 定义数据格式 (JSON 格式)
    'format' = 'json',
    'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
    'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
    );

    CSV 格式输入

    CREATE TABLE `kafka_csv_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- 替换为您要消费的 Topic
    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    
    -- 定义数据格式 (CSV 格式)
    'format' = 'csv'
    );

    Debezium 格式输入

    CREATE TABLE `kafka_debezium_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- 替换为您要消费的 Topic
    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    
    -- 定义数据格式 (Debezium 输出的 JSON 格式)
    'format' = 'debezium-json'
    );

    Canal 格式输入

    CREATE TABLE `kafka_source`
    (
    aid BIGINT COMMENT 'unique id',
    charname string,
    `ts` timestamp(6),
    origin_database STRING METADATA FROM 'value.database' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
    origin_type STRING METADATA FROM 'value.operation-type' VIRTUAL,
    `batch_id` bigint METADATA FROM 'value.batch-id' VIRTUAL,
    `is_ddl` boolean METADATA FROM 'value.is-ddl' VIRTUAL,
    origin_old ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before' VIRTUAL,
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type' VIRTUAL,
    origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
    `sql` STRING METADATA FROM 'value.sql' VIRTUAL,
    origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL
    ) WITH (
    'connector' = 'kafka', -- 注意选择对应的内置 Connector
    'topic' = '$TOPIC', -- 替换为您要消费的 Topic
    'properties.bootstrap.servers' = '$IP:$PORT', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    'scan.startup.mode' = 'latest-offset',
    'scan.topic-partition-discovery.interval' = '5s',
    'format' = 'canal-json',
    'canal-json.ignore-parse-errors' = 'false', -- 忽略 JSON 结构解析异常
    'canal-json.source.append-mode' = 'true' -- 仅支持Flink1.13及以上版本
    );

    用作数据目的(Sink)

    JSON 格式输出

    CREATE TABLE `kafka_json_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Output', -- 替换为您要写入的 Topic
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    
    -- 定义数据格式 (JSON 格式)
    'format' = 'json',
    'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
    'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
    );

    CSV 格式输出

    CREATE TABLE `kafka_csv_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Output', -- 替换为您要写入的 Topic
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    
    -- 定义数据格式 (CSV 格式)
    'format' = 'csv'
    );

    Canal 格式输出

    CREATE TABLE `kafka_canal_json_sink_table`
    (
    aid BIGINT COMMENT 'unique id',
    charname string,
    `ts` timestamp(6),
    origin_database STRING METADATA FROM 'value.database',
    origin_table STRING METADATA FROM 'value.table',
    origin_ts TIMESTAMP(3) METADATA FROM 'value.event-timestamp',
    `type` STRING METADATA FROM 'value.operation-type',
    `batch_id` bigint METADATA FROM 'value.batch-id',
    `isDdl` BOOLEAN METADATA FROM 'value.is-ddl',
    `old` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type',
    `pk_names` ARRAY<STRING> METADATA FROM 'value.pk-names',
    `sql` STRING METADATA FROM 'value.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql-type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
    ) WITH (
    'connector' = 'kafka', -- 注意选择对应的内置 Connector
    'topic' = '$TOPIC', -- 替换为您要消费的 Topic
    'properties.bootstrap.servers' = '$IP:$PORT', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    'format' = 'canal-json'
    );

    WITH 参数

    参数值
    必填
    默认值
    描述
    connector
    固定值为 'kafka'
    topic
    要读写的 Kafka Topic 名。
    properties.bootstrap.servers
    逗号分隔的 Kafka Bootstrap 地址。
    properties.group.id
    作为数据源时必选
    Kafka 消费时的 Group ID。
    format
    Kafka 消息的输入输出格式。目前支持 csvjsonavrodebezium-jsoncanal-json,Flink1.13支持 maxwell-json
    scan.startup.mode
    group-offsets
    Kafka consumer 的启动模式。可以是 latest-offsetearliest-offsetspecific-offsetsgroup-offsetstimestamp 的任何一种。
    'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',使用 'specific-offsets' 启动模式时需要指定每个 partition 对应的 offsets。
    'scan.startup.timestamp-millis' = '1631588815000',使用 'timestamp' 启动模式时需要指定启动的时间戳(单位毫秒)。
    scan.startup.specific-offsets
    如果 scan.startup.mode 的值为'specific-offsets',则必须使用本参数指定具体起始读取的偏移量。例如 'partition:0,offset:42;partition:1,offset:300'
    scan.startup.timestamp-millis
    如果scan.startup.mode 的值为'timestamp',则必须使用本参数来指定开始读取的时间点(毫秒为单位的 Unix 时间戳)。
    sink.partitioner
    Kafka 输出时所用的分区器。目前支持的分区器如下:
    fixed:一个 Flink 分区对应不多于一个 Kafka 分区。
    round-robin:一个Flink 分区依次被分配到不同的 Kafka 分区。
    自定义分区:也可以通过继承 FlinkKafkaPartitioner 类,实现该逻辑。

    JSON 格式 WITH 参数

    参数值
    必填
    默认值
    描述
    json.fail-on-missing-field
    false
    如果为 true,则遇到缺失字段时,会让作业失败。如果为 false(默认值),则只会把缺失字段设置为 null 并继续处理。
    json.ignore-parse-errors
    false
    如果为 true,则遇到解析异常时,会把这个字段设置为 null 并继续处理。如果为 false,则会让作业失败。
    json.timestamp-format.standard
    SQL
    指定 JSON 时间戳字段的格式,默认是 SQL(格式是yyyy-MM-dd HH:mm:ss.s{可选精度})。也可以选择 ISO-8601,格式是 yyyy-MM-ddTHH:mm:ss.s{可选精度}

    CSV 格式 WITH 参数

    参数值
    必填
    默认值
    描述
    csv.field-delimiter
    ,
    指定 CSV 字段分隔符,默认是半角逗号。
    csv.line-delimiter
    U&'\\000A'
    指定 CSV 的行分隔符,默认是换行符\\n,SQL 中必须用U&'\\000A'表示。如果需要使用回车符\\r,SQL 中必须使用U&'\\000D'表示。
    csv.disable-quote-character
    false
    禁止字段包围引号。如果为 true,则 'csv.quote-character' 选项不可用。
    csv.quote-character
    "
    字段包围引号,引号内部的作为整体看待。默认是"
    csv.ignore-parse-errors
    false
    忽略处理错误。对于无法解析的字段,会输出为 null。
    csv.allow-comments
    false
    忽略 # 开头的注释行,并输出为空行(请务必将 csv.ignore-parse-errors 设为 true)。
    csv.array-element-delimiter
    ;
    数组元素的分隔符,默认是;
    csv.escape-character
    指定转义符,默认禁用转义。
    csv.null-literal
    将指定的字符串看作 null 值。

    Debezium-json 格式 WITH 参数

    参数值
    必填
    默认值
    描述
    debezium-json.schema-include
    false
    设置 Debezium Kafka Connect 时,如果指定了'value.converter.schemas.enable'参数,那么 Debezium 发来的 JSON 数据里会包含 Schema 信息,该选项需要设置为 true。
    debezium-json.ignore-parse-errors
    false
    忽略处理错误。对于无法解析的字段,会输出为 null。
    debezium-json.timestamp-format.standard
    SQL
    指定 JSON 时间戳字段的格式,默认是 SQL(格式是 yyyy-MM-dd HH:mm:ss.s{可选精度})。也可以选择 ISO-8601,格式是yyyy-MM-ddTHH:mm:ss.s{可选精度}

    Canal 格式 WITH 参数

    参数值
    必填
    默认值
    描述
    canal-json.source.append-mode
    false
    设置为 true 时支持 append 流,例如,消费 kafka canal-json 数据到 hive,该参数仅支持 Flink1.13 集群
    debezium-json.ignore-parse-errors
    false
    忽略处理错误。对于无法解析的字段,会输出为 null。
    canal-json.*
    -

    Canal 格式支持的元数据(仅支持 Flink1.13 版本集群)

    以下元数据只能作为表定义中的只读(VIRTUAL)列,若元数据列与物理列冲突,元数据列可以使用meta.列名:
    数据类型
    描述
    database
    STRING NOT NULL
    包含该 Row 的数据库名称
    table
    STRING NOT NULL
    包含该 Row的表名称
    event-timestamp
    TIMESTAMP_LTZ(3) NOT NULL
    Row 在数据库中进行更改的时间
    batch-id
    BIGINT
    binlog 的批 id
    is-ddl
    BOOLEAN
    是否 DDL 语句
    mysql-type
    MAP
    数据表结构
    update-before
    ARRAY
    未修改前字段的值
    pk-names
    ARRAY
    主键字段名
    sql
    STRING
    暂时为空
    sql-type
    MAP
    sql_type 表的字段到 java 数据类型 ID 的映射
    ingestion-timestamp
    TIMESTAMP_LTZ(3) NOT NULL
    收到该 ROW 并处理的当前时间
    operation-type
    STRING
    数据库操作类型,例如 INSERT/DELETE 等

    代码示例

    Json 格式使用示列

    CREATE TABLE `kafka_json_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- 替换为您要消费的 Topic
    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    
    -- 定义数据格式 (JSON 格式)
    'format' = 'json',
    'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
    'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
    );
    CREATE TABLE `kafka_json_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'Data-Output', -- 替换为您要写入的 Topic
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
    
    -- 定义数据格式 (JSON 格式)
    'format' = 'json',
    'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
    'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
    );
    insert into kafka_json_sink_table select * from kafka_json_source_table;

    Canal 使用示例

    CREATE TABLE `source`
    (
    `aid` bigint,
    `charname` string,
    `ts` timestamp(6),
    `database_name` string METADATA FROM 'value.database_name',
    `table_name` string METADATA FROM 'value.table_name',
    `op_ts` timestamp(3) METADATA FROM 'value.op_ts',
    `op_type` string METADATA FROM 'value.op_type',
    `batch_id` bigint METADATA FROM 'value.batch_id',
    `is_ddl` boolean METADATA FROM 'value.is_ddl',
    `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
    `pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
    `sql` STRING METADATA FROM 'value.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
    primary key (`aid`) not enforced
    ) WITH (
    'connector' = 'mysql-cdc' ,
    'append-mode' = 'true',
    'hostname' = '$IP',
    'port' = '$PORT',
    'username' = '$USERNAME',
    'password' = '$PASSWORD',
    'database-name' = 't_wr',
    'table-name' = 't1',
    'server-time-zone' = 'Asia/Shanghai',
    'server-id' = '5500-5510'
    );
    
    CREATE TABLE `kafka_canal_json_sink`
    (
    aid BIGINT COMMENT 'unique id',
    charname string,
    `ts` timestamp(6),
    origin_database STRING METADATA FROM 'value.database',
    origin_table STRING METADATA FROM 'value.table',
    origin_ts TIMESTAMP(3) METADATA FROM 'value.event-timestamp',
    `type` STRING METADATA FROM 'value.operation-type',
    `batch_id` bigint METADATA FROM 'value.batch-id',
    `isDdl` BOOLEAN METADATA FROM 'value.is-ddl',
    `old` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type',
    `pk_names` ARRAY<STRING> METADATA FROM 'value.pk-names',
    `sql` STRING METADATA FROM 'value.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql-type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
    )
    WITH (
    'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
    'topic' = 'TOPIC', -- 替换为您要消费的 Topic
    'properties.bootstrap.servers' = '$IP:$PORT', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    'format' = 'canal-json'
    );
    
    insert into kafka_canal_json_sink select * from source;
    CREATE TABLE `source`
    (
    `aid` bigint,
    `charname` string,
    `ts` timestamp(3),
    origin_database STRING METADATA FROM 'value.database' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
    origin_type STRING METADATA FROM 'value.operation-type' VIRTUAL,
    `batch_id` bigint METADATA FROM 'value.batch-id' VIRTUAL,
    `is_ddl` boolean METADATA FROM 'value.is-ddl' VIRTUAL,
    origin_old ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before' VIRTUAL,
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type' VIRTUAL,
    origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
    `sql` STRING METADATA FROM 'value.sql' VIRTUAL,
    origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
    WATERMARK FOR `origin_es` AS `origin_es` - INTERVAL '5' SECOND
    ) WITH (
    'connector' = 'kafka', -- 注意选择对应的内置 Connector
    'topic' = '$TOPIC', -- 替换为您要消费的 Topic
    'properties.bootstrap.servers' = '$IP:PORT', -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
    'scan.startup.mode' = 'latest-offset',
    'scan.topic-partition-discovery.interval' = '10s',
    
    'format' = 'canal-json',
    'canal-json.source.append-mode' = 'true', -- 仅支持Flink1.13
    'canal-json.ignore-parse-errors' = 'false'
    );
    
    
    CREATE TABLE `kafka_canal_json` (
    `aid` bigint,
    `charname` string,
    `ts` timestamp(9),
    origin_database STRING,
    origin_table STRING,
    origin_es TIMESTAMP(9),
    origin_type STRING,
    `batch_id` bigint,
    `is_ddl` boolean,
    origin_old ARRAY<MAP<STRING, STRING>>,
    `mysql_type` MAP<STRING, STRING>,
    origin_pk_names ARRAY<STRING>,
    `sql` STRING,
    origin_sql_type MAP<STRING, INT>,
    `ingestion_ts` TIMESTAMP(9),
    dt STRING,
    hr STRING
    ) PARTITIONED BY (dt, hr)
    with (
    'connector' = 'hive',
    'hive-version' = '3.1.1',
    'hive-database' = 'testdb',
    'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    'sink.partition-commit.trigger'='partition-time',
    'sink.partition-commit.delay'='30 min',
    'sink.partition-commit.policy.kind'='metastore,success-file'
    );
    
    insert into kafka_canal_json select *,DATE_FORMAT(`origin_es`,'yyyy-MM-dd'),DATE_FORMAT(`origin_es`,'HH')
    from `source`;

    SASL 认证授权

    SASL/PLAIN 用户名密码认证授权

    1. 参考 消息队列 CKafka - 配置 ACL 策略,设置 Topic 按用户名密码访问的 SASL_PLAINTEXT 认证方式。
    2. 参考 消息队列 CKafka - 添加路由策略,选择 SASL_PLAINTEXT 接入方式,并以该接入方式下的网络地址访问 Topic。
    3. 作业配置 with 参数。
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    ...
    );
    说明
    username实例 ID + # + 刚配置的用户名password 是刚配置的用户密码。

    SASL/GSSAPI Kerberos 认证授权

    腾讯云 CKafka 暂时不支持 Kerberos 认证,您的自建 Kafka 如果开启了 Kerberos 认证,可参考如下步骤配置作业。
    1. 获取您的自建 Kafka 集群的 Kerberos 配置文件,如果您基于腾讯云 EMR 集群自建,获取 krb5.conf、emr.keytab 文件,路径如下。
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    2. 对步骤1中获取的文件打 jar 包。
    jar cvf kafka-xxx.jar krb5.conf emr.keytab
    3. 校验 jar 的结构(可以通过 vim 命令查看 vim kafka-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
    5. 获取 kerberos principal,用于作业 高级参数 配置。
    klist -kt /var/krb5kdc/emr.keytab
    
    # 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. 作业 with 参数配置。
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'GSSAPI',
    'properties.sasl.kerberos.service.name' = 'hadoop',
    ...
    );
    7. 作业 高级参数 配置。
    security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FD
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    security.kerberos.login.contexts: KafkaClient
    fs.hdfs.hadoop.security.authentication: kerberos
    注意:
    历史 Oceanus 集群可能不支持该功能,您可通过 在线客服 联系我们升级集群管控服务,以支持 Kerberos 访问。
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持