tencent cloud

Feedback

Last updated: 2024-08-22 10:43:58

    Overview

    The HBase connector supports read and write of an HBase cluster. Stream Compute Service provides the components of the built-in flink-connector-hbase connector.

    Versions

    Flink Version
    Description
    1.11
    HBase v1.4.x supported.
    1.13
    HBase v1.4.x, v2.2.x, and 2.3.x supported.
    1.14
    HBase v1.4.x and 2.2.x supported.
    1.16
    HBase v1.4.x and v2.2.x supported.

    User cases

    The HBase connector can be used as a source or a dimension table, and as a sink of tuple and upsert streams.

    Defining a table in DDL

    CREATE TABLE hbase_table (
    rowkey INT,
    cf ROW < school_name STRING >,
    PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
    'connector' = 'hbase-1.4', -- Flink v1.13 supports HBase v2.2.
    'table-name' = 'hbase_sink_table', -- The name of the HBase table.
    'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- The ZooKeeper address of the HBase table.
    );

    WITH parameters

    Option
    Description
    Required
    Remarks
    connector
    The table to use.
    Yes
    hbase-1.4 or hbase-2.2.
    If HBase 2.3.x is used, connector should be set to hbase-2.2.
    table-name
    The name of the HBase table.
    Yes
    -
    zookeeper.quorum
    The ZooKeeper address of the HBase table.
    Yes
    -
    zookeeper.znode.parent
    The root directory of the HBase table in ZooKeeper.
    No
    -
    null-string-literal
    When the type of an HBase field is string and the value of the Flink field is null, null-string-literal is assigned to this field and written to the HBase table.
    No
    Default value: null.
    sink.buffer-flush.max-size
    The size of data (in bytes) to be cached in the memory before write to HBase. A larger value of this option can help improve the HBase write performance, but the write latency and the memory required will increase, too. This option applies only when the HBase connector is used as a sink.
    No
    This option defaults to 2 MB, and supported units include B, KB, MB, and GB (case-insensitive). Setting it to 0 means no data will be cached.
    sink.buffer-flush.max-rows
    The number of records to be cached in the memory before write to HBase. A larger value of this option can help improve the HBase write performance, but the write latency and the memory required will increase, too. This option applies only when the HBase connector is used as a sink.
    No
    This option defaults to 1000, and setting it to 0 means no data will be cached.
    sink.buffer-flush.interval
    The interval for writing cached data to HBase. This option allows controlling the delay of data write to HBase. This option applies only when the HBase connector is used as a sink.
    No
    This option defaults to 1s, and supported units include ms, s, min, h, and d. Setting it to 0 means periodic write is disabled.

    Data type mapping

    HBase stores everything as bytes, so the data needs to be serialized or deserialized during read and write. The Flink and HBase data types are mapped as follows:
    Flink Type
    HBase Type
    CHAR / VARCHAR / STRING
    byte[] toBytes(String s) String toString(byte[] b)
    BOOLEAN
    byte[] toBytes(boolean b) boolean toBoolean(byte[] b)
    BINARY / VARBINARY
    byte[]
    DECIMAL
    byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b)
    TINYINT
    new byte[] { val } bytes[0]
    SMALLINT
    byte[] toBytes(short val) short toShort(byte[] bytes)
    INT
    byte[] toBytes(int val) int toInt(byte[] bytes)
    BIGINT
    byte[] toBytes(long val) long toLong(byte[] bytes)
    FLOAT
    byte[] toBytes(float val) float toFloat(byte[] bytes)
    DOUBLE
    byte[] toBytes(double val) double toDouble(byte[] bytes)
    DATE
    Converts the date into the number of days since January 1, 1970, expressed in int, and then converts it into byte using byte[] toBytes(int val).
    TIME
    Converts the time into the number of milliseconds since 00:00:00, expressed in int, and then converts it into byte using byte[] toBytes(int val).
    TIMESTAMP
    Converts the timestamp into the number of milliseconds since 00:00:00 on January 1, 1970, expressed in long, and then converts it into byte using byte[] toBytes(long val).
    ARRAY
    Unsupported
    MAP / MULTISET
    Unsupported
    ROW
    Unsupported

    Example

    The following example shows a block of code of a real-time computing job containing an HBase dimension table.
    CREATE TABLE datagen_source_table (
    id INT,
    name STRING,
    `proc_time` AS PROCTIME()
    ) with (
    'connector'='datagen',
    'rows-per-second'='1'
    );
    
    CREATE TABLE hbase_table (
    rowkey INT,
    cf ROW < school_name STRING >,
    PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
    'connector' = 'hbase-1.4', -- Flink v1.13 supports HBase v2.2.
    'table-name' = 'hbase_sink_table', -- The name of the HBase table.
    'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- The ZooKeeper address of the HBase table.
    );
    
    CREATE TABLE blackhole_sink(
    id INT,
    name STRING
    ) with (
    'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT id, cf.school_name as name FROM datagen_source_table src
    JOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;

    Notes

    The HBase connector generally uses the primary key defined in the DDL statements and works in the upsert mode to exchange change logs with the external systems. Therefore, the primary key must be defined in the rowkey field (this field must be declared) of the HBase connector. If the PRIMARY KEY clause is not declared, the HBase connector defaults the primary key to rowkey.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support