flink-connector-hbase
connector. Flink Version | Description |
1.11 | HBase v1.4.x supported. |
1.13 | HBase v1.4.x, v2.2.x, and 2.3.x supported. |
1.14 | HBase v1.4.x and 2.2.x supported. |
1.16 | HBase v1.4.x and v2.2.x supported. |
CREATE TABLE hbase_table (rowkey INT,cf ROW < school_name STRING >,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4', -- Flink v1.13 supports HBase v2.2.'table-name' = 'hbase_sink_table', -- The name of the HBase table.'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- The ZooKeeper address of the HBase table.);
Option | Description | Required | Remarks |
connector | The table to use. | Yes | hbase-1.4 or hbase-2.2 .If HBase 2.3.x is used, connector should be set to hbase-2.2 . |
table-name | The name of the HBase table. | Yes | - |
zookeeper.quorum | The ZooKeeper address of the HBase table. | Yes | - |
zookeeper.znode.parent | The root directory of the HBase table in ZooKeeper. | No | - |
null-string-literal | When the type of an HBase field is string and the value of the Flink field is null, null-string-literal is assigned to this field and written to the HBase table. | No | Default value: null . |
sink.buffer-flush.max-size | The size of data (in bytes) to be cached in the memory before write to HBase. A larger value of this option can help improve the HBase write performance, but the write latency and the memory required will increase, too. This option applies only when the HBase connector is used as a sink. | No | This option defaults to 2 MB, and supported units include B, KB, MB, and GB (case-insensitive). Setting it to 0 means no data will be cached. |
sink.buffer-flush.max-rows | The number of records to be cached in the memory before write to HBase. A larger value of this option can help improve the HBase write performance, but the write latency and the memory required will increase, too. This option applies only when the HBase connector is used as a sink. | No | This option defaults to 1000 , and setting it to 0 means no data will be cached. |
sink.buffer-flush.interval | The interval for writing cached data to HBase. This option allows controlling the delay of data write to HBase. This option applies only when the HBase connector is used as a sink. | No | This option defaults to 1s, and supported units include ms, s, min, h, and d. Setting it to 0 means periodic write is disabled. |
Flink Type | HBase Type |
CHAR / VARCHAR / STRING | byte[] toBytes(String s) String toString(byte[] b) |
BOOLEAN | byte[] toBytes(boolean b) boolean toBoolean(byte[] b) |
BINARY / VARBINARY | byte[] |
DECIMAL | byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) |
TINYINT | new byte[] { val } bytes[0] |
SMALLINT | byte[] toBytes(short val) short toShort(byte[] bytes) |
INT | byte[] toBytes(int val) int toInt(byte[] bytes) |
BIGINT | byte[] toBytes(long val) long toLong(byte[] bytes) |
FLOAT | byte[] toBytes(float val) float toFloat(byte[] bytes) |
DOUBLE | byte[] toBytes(double val) double toDouble(byte[] bytes) |
DATE | Converts the date into the number of days since January 1, 1970, expressed in int, and then converts it into byte using byte[] toBytes(int val) . |
TIME | Converts the time into the number of milliseconds since 00:00:00, expressed in int, and then converts it into byte using byte[] toBytes(int val) . |
TIMESTAMP | Converts the timestamp into the number of milliseconds since 00:00:00 on January 1, 1970, expressed in long, and then converts it into byte using byte[] toBytes(long val) . |
ARRAY | Unsupported |
MAP / MULTISET | Unsupported |
ROW | Unsupported |
CREATE TABLE datagen_source_table (id INT,name STRING,`proc_time` AS PROCTIME()) with ('connector'='datagen','rows-per-second'='1');CREATE TABLE hbase_table (rowkey INT,cf ROW < school_name STRING >,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4', -- Flink v1.13 supports HBase v2.2.'table-name' = 'hbase_sink_table', -- The name of the HBase table.'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- The ZooKeeper address of the HBase table.);CREATE TABLE blackhole_sink(id INT,name STRING) with ('connector' = 'blackhole');INSERT INTO blackhole_sinkSELECT id, cf.school_name as name FROM datagen_source_table srcJOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;
upsert
mode to exchange change logs with the external systems. Therefore, the primary key must be defined in the rowkey
field (this field must be declared) of the HBase connector. If the PRIMARY KEY clause is not declared, the HBase connector defaults the primary key to rowkey
.
Was this page helpful?