tencent cloud

文档反馈

数据仓库 Kudu

最后更新时间:2023-11-08 16:02:05

    介绍

    Kudu Connector 提供了对 Kudu 的读写支持。

    版本说明

    Flink 版本
    说明
    1.11
    支持
    1.13
    支持
    1.14
    不支持
    1.16
    支持

    使用范围

    Kudu Connector 支持用作数据源表(Source,仅限于普通和维表 JOIN 的右表),也可以作为 Tuple 数据流的目的表(Sink),还可以作为 Upsert 数据流的目的表(Sink,需要包含主键)。

    DDL 定义

    用作数据源(Source)

    CREATE TABLE `kudu_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 指定Kudu连接参数
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
    'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如 default.TestTable1
    'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
    'kudu.primary-key-columns' = 'id', -- 可选参数,主键
    'kudu.operation-timeout' = '10000', -- 可选参数,插入超时时间
    'kudu.max-buffer-size' = '2000', -- 可选参数,buffer 大小
    'kudu.flush-interval' = '1000' -- 可选参数,刷新数据到 kudu 的时间间隔
    );

    用作数据目的(Tuple Sink)

    CREATE TABLE `kudu_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 指定Kudu连接参数
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
    'kudu.table' = 'TableName1', -- 替换为Kudu中对应的表,如 default.TestTable1
    'kudu.igonre-duplicate' = 'true' --可选参数,为 true 时会忽略主键重复的数据
    );

    用作数据目的(Upsert Sink)

    CREATE TABLE `kudu_upsert_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 指定 Kudu 连接参数
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
    'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如default.TestTable1
    'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
    'kudu.primary-key-columns' = 'id' -- 必选参数,主键。Upsert Sink 需要包含主键。
    );

    WITH 参数

    参数值
    必填
    默认值
    描述
    connector.type
    连接 Kudu 数据库时,需要填写 'kudu'
    kudu.masters
    Kudu 数据库 MasterServer 的连接地址。端口默认为7051。若使用腾讯云的 Kudu 组件,master 地址和端口可以在 弹性 MapReduce 控制台 的集群列表,单击集群 ID/名称,进入集群详情页,然后在集群服务 > Kudu > 操作 > 查看端口中找到对应的 master server IP 和端口
    kudu.table
    数据库表名。例如 Impala 创建的 kudu 内表一般为 impala::db_name.table_name,Java API 创建的 Kudu 表 db_name.tablename
    kudu.hash-columns
    Hash 键
    kudu.primary-key-columns
    主键
    kudu.replicas
    副本数量
    kudu.operation-timeout
    30000
    插入超时时间,单位为毫秒
    kudu.max-buffer-size
    1000
    默认为1000
    kudu.flush-interval
    1000
    默认为1000
    kudu.ignore-not-found
    false
    是否忽略未找到的数据
    kudu.ignore-duplicate
    false
    插入数据时是否会忽略主键重复的数据

    类型映射

    Flink 类型
    Kudu
    STRING
    STRING
    BOOLEAN
    BOOL
    TINYINT
    INT8
    SMALLINT
    INT16
    INT
    INT32
    BIGINT
    INT64
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    BYTES
    BINARY
    TIMESTAMP(3)
    UNIXTIME_MICROS

    代码示例

    CREATE TABLE `kudu_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 指定Kudu连接参数
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
    'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如 default.TestTable1
    'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
    'kudu.primary-key-columns' = 'id', -- 可选参数,主键
    'kudu.operation-timeout' = '10000', -- 可选参数,插入超时时间
    'kudu.max-buffer-size' = '2000', -- 可选参数,buffer 大小
    'kudu.flush-interval' = '1000' -- 可选参数,刷新数据到 kudu 的时间间隔
    );
    
    CREATE TABLE `kudu_upsert_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- 指定 Kudu 连接参数
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
    'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如default.TestTable1
    'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
    'kudu.primary-key-columns' = 'id' -- 必选参数,主键。Upsert Sink 需要包含主键。
    );
    
    insert into kudu_upsert_sink_table select * from kudu_source_table;

    注意事项

    1. 若需要使用 Impala 查询 Kudu 数据库的表时,需确认是否已经创建了对应的外表。
    2. 非 Impala-shell 创建的表,默认在 Impala 中没有对应的外表,需创建对应的 Kudu 外表才能查到记录。
    3. Kudu 作为 Oceanus 的 Sink 端时,若 Kudu 中该表不存在,则会在 Kudu 中创建对应的内表。

    Kudu Kerberos 认证授权

    1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab文件,路径如下。
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    2. 对步骤1中获取的文件打 jar 包。
    jar cvf kudu-xxx.jar krb5.conf emr.keytab
    3. 校验 jar 的结构(可以通过 vim 命令查看 vim kudu-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
    5. 获取 kerberos principal,用于作业 高级参数 配置。
    klist -kt /var/krb5kdc/emr.keytab
    
    # 输出如下所示,选取第一个即可:hadoop/172.28.22.43@EMR-E4331BF2
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 07/06/2023 18:50:41 hadoop/172.28.22.43@EMR-E4331BF2
    2 07/06/2023 18:50:41 HTTP/172.28.22.43@EMR-E4331BF2
    2 07/06/2023 18:50:41 kudu/172.28.22.43@EMR-E4331BF2
    6. 作业 高级参数 配置。
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.22.43@EMR-E4331BF2
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    fs.hdfs.hadoop.security.authentication: kerberos
    注意:
    历史 Oceanus 集群可能不支持该功能,您可通过 在线客服 联系我们升级集群管控服务,以支持 Kerberos 访问。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持