flink-connector-hbase
Connector 组件。Flink 版本 | 说明 |
1.11 | 支持 hbase 版本为:1.4.x |
1.13 | 支持 hbase 版本为:1.4.x、2.2.x、2.3.x |
1.14 | 支持 hbase 版本为:1.4.x、2.2.x |
1.16 | 支持 hbase 版本为: 1.4.x、2.2.x |
CREATE TABLE hbase_table (rowkey INT,cf ROW < school_name STRING >,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2'table-name' = 'hbase_sink_table', -- Hbase 表名'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址);
参数 | 说明 | 是否必填 | 备注 |
connector | 表类型 | 是 | hbase-1.4 或者 hbase-2.2 如果您用了 hbase 2.3.x 版本,那么,connector 参数值需要替换为 hbase-2.2 |
table-name | HBase 表名 | 是 | - |
zookeeper.quorum | HBase 的 zookeeper 地址 | 是 | - |
zookeeper.znode.parent | HBase 在 zookeeper 中的根目录 | 否 | - |
null-string-literal | HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为 null-string-literal,并写入 HBase | 否 | 默认为 null |
sink.buffer-flush.max-size | 写入 HBase 前,内存中缓存的数据量(字节)大小。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用 | 否 | 默认值为2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为0表示不进行缓存 |
sink.buffer-flush.max-rows | 写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用 | 否 | 默认值为1000,设置为0表示不进行缓存 |
sink.buffer-flush.interval | 将缓存数据周期性写入到 HBase 的间隔,可以控制写入 HBase 的延迟。仅作为 Sink 时使用。 | 否 | 默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0表示关闭定期写入 |
Flink 字段类型 | HBase 转换 |
CHAR / VARCHAR / STRING | byte[] toBytes(String s) String toString(byte[] b) |
BOOLEAN | byte[] toBytes(boolean b)boolean toBoolean(byte[] b) |
BINARY / VARBINARY | byte[] |
DECIMAL | byte[] toBytes(BigDecimal v)BigDecimal toBigDecimal(byte[] b) |
TINYINT | new byte[] { val } bytes[0] |
SMALLINT | byte[] toBytes(short val)short toShort(byte[] bytes) |
INT | byte[] toBytes(int val)int toInt(byte[] bytes) |
BIGINT | byte[] toBytes(long val)long toLong(byte[] bytes) |
FLOAT | byte[] toBytes(float val)float toFloat(byte[] bytes) |
DOUBLE | byte[] toBytes(double val)double toDouble(byte[] bytes) |
DATE | 将日期转换成自1970.01.01以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组 |
TIME | 将时间转换成自00:00:00以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组 |
TIMESTAMP | 将时间戳转换成自1970-01-01 00:00:00以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组 |
ARRAY | 不支持 |
MAP / MULTISET | 不支持 |
ROW | 不支持 |
CREATE TABLE datagen_source_table (id INT,name STRING,`proc_time` AS PROCTIME()) with ('connector'='datagen','rows-per-second'='1');CREATE TABLE hbase_table (rowkey INT,cf ROW < school_name STRING >,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2'table-name' = 'hbase_sink_table', -- Hbase 表名'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址);CREATE TABLE blackhole_sink(id INT,name STRING) with ('connector' = 'blackhole');INSERT INTO blackhole_sinkSELECT id, cf.school_name as name FROM datagen_source_table srcJOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;
upsert
模式工作,与外部系统交换变更日志信息。因此,必须在 HBase 的 rowkey 字段上定义主键(必须声明 rowkey 字段)。如果未声明 PRIMARY KEY 子句,则 HBase 连接器默认将 rowkey 作为主键。
本页内容是否解决了您的问题?