tencent cloud

All product documents
Stream Compute Service
Last updated: 2024-08-22 10:43:58
HBase
Last updated: 2024-08-22 10:43:58

Overview

The HBase connector supports read and write of an HBase cluster. Stream Compute Service provides the components of the built-in flink-connector-hbase connector.

Versions

Flink Version
Description
1.11
HBase v1.4.x supported.
1.13
HBase v1.4.x, v2.2.x, and 2.3.x supported.
1.14
HBase v1.4.x and 2.2.x supported.
1.16
HBase v1.4.x and v2.2.x supported.

User cases

The HBase connector can be used as a source or a dimension table, and as a sink of tuple and upsert streams.

Defining a table in DDL

CREATE TABLE hbase_table (
rowkey INT,
cf ROW < school_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', -- Flink v1.13 supports HBase v2.2.
'table-name' = 'hbase_sink_table', -- The name of the HBase table.
'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- The ZooKeeper address of the HBase table.
);

WITH parameters

Option
Description
Required
Remarks
connector
The table to use.
Yes
hbase-1.4 or hbase-2.2.
If HBase 2.3.x is used, connector should be set to hbase-2.2.
table-name
The name of the HBase table.
Yes
-
zookeeper.quorum
The ZooKeeper address of the HBase table.
Yes
-
zookeeper.znode.parent
The root directory of the HBase table in ZooKeeper.
No
-
null-string-literal
When the type of an HBase field is string and the value of the Flink field is null, null-string-literal is assigned to this field and written to the HBase table.
No
Default value: null.
sink.buffer-flush.max-size
The size of data (in bytes) to be cached in the memory before write to HBase. A larger value of this option can help improve the HBase write performance, but the write latency and the memory required will increase, too. This option applies only when the HBase connector is used as a sink.
No
This option defaults to 2 MB, and supported units include B, KB, MB, and GB (case-insensitive). Setting it to 0 means no data will be cached.
sink.buffer-flush.max-rows
The number of records to be cached in the memory before write to HBase. A larger value of this option can help improve the HBase write performance, but the write latency and the memory required will increase, too. This option applies only when the HBase connector is used as a sink.
No
This option defaults to 1000, and setting it to 0 means no data will be cached.
sink.buffer-flush.interval
The interval for writing cached data to HBase. This option allows controlling the delay of data write to HBase. This option applies only when the HBase connector is used as a sink.
No
This option defaults to 1s, and supported units include ms, s, min, h, and d. Setting it to 0 means periodic write is disabled.

Data type mapping

HBase stores everything as bytes, so the data needs to be serialized or deserialized during read and write. The Flink and HBase data types are mapped as follows:
Flink Type
HBase Type
CHAR / VARCHAR / STRING
byte[] toBytes(String s) String toString(byte[] b)
BOOLEAN
byte[] toBytes(boolean b) boolean toBoolean(byte[] b)
BINARY / VARBINARY
byte[]
DECIMAL
byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b)
TINYINT
new byte[] { val } bytes[0]
SMALLINT
byte[] toBytes(short val) short toShort(byte[] bytes)
INT
byte[] toBytes(int val) int toInt(byte[] bytes)
BIGINT
byte[] toBytes(long val) long toLong(byte[] bytes)
FLOAT
byte[] toBytes(float val) float toFloat(byte[] bytes)
DOUBLE
byte[] toBytes(double val) double toDouble(byte[] bytes)
DATE
Converts the date into the number of days since January 1, 1970, expressed in int, and then converts it into byte using byte[] toBytes(int val).
TIME
Converts the time into the number of milliseconds since 00:00:00, expressed in int, and then converts it into byte using byte[] toBytes(int val).
TIMESTAMP
Converts the timestamp into the number of milliseconds since 00:00:00 on January 1, 1970, expressed in long, and then converts it into byte using byte[] toBytes(long val).
ARRAY
Unsupported
MAP / MULTISET
Unsupported
ROW
Unsupported

Example

The following example shows a block of code of a real-time computing job containing an HBase dimension table.
CREATE TABLE datagen_source_table (
id INT,
name STRING,
`proc_time` AS PROCTIME()
) with (
'connector'='datagen',
'rows-per-second'='1'
);

CREATE TABLE hbase_table (
rowkey INT,
cf ROW < school_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', -- Flink v1.13 supports HBase v2.2.
'table-name' = 'hbase_sink_table', -- The name of the HBase table.
'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- The ZooKeeper address of the HBase table.
);

CREATE TABLE blackhole_sink(
id INT,
name STRING
) with (
'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT id, cf.school_name as name FROM datagen_source_table src
JOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;

Notes

The HBase connector generally uses the primary key defined in the DDL statements and works in the upsert mode to exchange change logs with the external systems. Therefore, the primary key must be defined in the rowkey field (this field must be declared) of the HBase connector. If the PRIMARY KEY clause is not declared, the HBase connector defaults the primary key to rowkey.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon