tencent cloud

文档反馈

数据库 HBase

最后更新时间:2024-08-22 10:44:12

    介绍

    HBase Connector 提供了对 HBase 集群的读写支持。Oceanus 已经提供了内置的flink-connector-hbase Connector 组件。

    版本说明

    Flink 版本
    说明
    1.11
    支持 hbase 版本为:1.4.x
    1.13
    支持 hbase 版本为:1.4.x、2.2.x、2.3.x
    1.14
    支持 hbase 版本为:1.4.x、2.2.x
    1.16
    支持 hbase 版本为: 1.4.x、2.2.x

    使用范围

    可以作为源表,维表,以及Tuple、Upsert 数据流的目的表。

    DDL 定义

    CREATE TABLE hbase_table (
    rowkey INT,
    cf ROW < school_name STRING >,
    PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
    'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2
    'table-name' = 'hbase_sink_table', -- Hbase 表名
    'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址
    );

    WITH 参数

    参数
    说明
    是否必填
    备注
    connector
    表类型
    hbase-1.4 或者 hbase-2.2
    如果您用了 hbase 2.3.x 版本,那么,connector 参数值需要替换为 hbase-2.2
    table-name
    HBase 表名
    -
    zookeeper.quorum
    HBase 的 zookeeper 地址
    -
    zookeeper.znode.parent
    HBase 在 zookeeper 中的根目录
    -
    null-string-literal
    HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为 null-string-literal,并写入 HBase
    默认为 null
    sink.buffer-flush.max-size
    写入 HBase 前,内存中缓存的数据量(字节)大小。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用
    默认值为2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为0表示不进行缓存
    sink.buffer-flush.max-rows
    写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用
    默认值为1000,设置为0表示不进行缓存
    sink.buffer-flush.interval
    将缓存数据周期性写入到 HBase 的间隔,可以控制写入 HBase 的延迟。仅作为 Sink 时使用
    默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0表示关闭定期写入

    类型映射

    HBase 将所有的数据存为字节数组。读写操作时需要将数据进行序列化和反序列化。Flink 与 HBase 的数据转换关系如下:
    Flink 字段类型
    HBase 转换
    CHAR / VARCHAR / STRING
    byte[] toBytes(String s) String toString(byte[] b)
    BOOLEAN
    byte[] toBytes(boolean b)boolean toBoolean(byte[] b)
    BINARY / VARBINARY
    byte[]
    DECIMAL
    byte[] toBytes(BigDecimal v)BigDecimal toBigDecimal(byte[] b)
    TINYINT
    new byte[] { val } bytes[0]
    SMALLINT
    byte[] toBytes(short val)short toShort(byte[] bytes)
    INT
    byte[] toBytes(int val)int toInt(byte[] bytes)
    BIGINT
    byte[] toBytes(long val)long toLong(byte[] bytes)
    FLOAT
    byte[] toBytes(float val)float toFloat(byte[] bytes)
    DOUBLE
    byte[] toBytes(double val)double toDouble(byte[] bytes)
    DATE
    将日期转换成自1970.01.01以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组
    TIME
    将时间转换成自00:00:00以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组
    TIMESTAMP
    将时间戳转换成自1970-01-01 00:00:00以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组
    ARRAY
    不支持
    MAP / MULTISET
    不支持
    ROW
    不支持

    代码示例

    包含 HBase 维表的实时计算作业代码,示例如下:
    CREATE TABLE datagen_source_table (
    id INT,
    name STRING,
    `proc_time` AS PROCTIME()
    ) with (
    'connector'='datagen',
    'rows-per-second'='1'
    );
    
    CREATE TABLE hbase_table (
    rowkey INT,
    cf ROW < school_name STRING >,
    PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
    'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2
    'table-name' = 'hbase_sink_table', -- Hbase 表名
    'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址
    );
    
    CREATE TABLE blackhole_sink(
    id INT,
    name STRING
    ) with (
    'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT id, cf.school_name as name FROM datagen_source_table src
    JOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;

    注意事项

    HBase Connector 一般会使用 DDL 语句中定义的主键,以 upsert 模式工作,与外部系统交换变更日志信息。因此,必须在 HBase 的 rowkey 字段上定义主键(必须声明 rowkey 字段)。如果未声明 PRIMARY KEY 子句,则 HBase 连接器默认将 rowkey 作为主键。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持