tencent cloud

All product documents
Stream Compute Service
Last updated: 2023-11-08 16:01:45
Kudu
Last updated: 2023-11-08 16:01:45

Overview

The Kudu connector allows data reading from and writing to Kudu.

Versions

Flink Version
Description
1.11
Supported
1.13
Supported
1.14
Unsupported
1.16
Supported

Use cases

The Kudu connector can be used as a source (for a general table and the right table in dimension-table join), a sink for tuple streams, and a source for upsert streams to allow data reading from and writing to Kudu.

Defining a table in DDL

As a source

CREATE TABLE `kudu_source_table` (
`id` INT,
`name` STRING
) WITH (
-- Specify the options to connect Kudu.
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
'kudu.hash-columns' = 'id', -- The hash key (optional).
'kudu.primary-key-columns' = 'id', -- The primary key (optional).
'kudu.operation-timeout' = '10000', -- The insert timeout period (optional).
'kudu.max-buffer-size' = '2000', -- The buffer size (optional).
'kudu.flush-interval' = '1000' -- The interval of data flush to Kudu (optional).
);

As a sink (for tuple streams)

CREATE TABLE `kudu_sink_table` (
`id` INT,
`name` STRING
) WITH (
-- Specify the options to connect Kudu.
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
'kudu.igonre-duplicate' = 'true' -- (Optional) If this option is set to `true`, the data will be ignored if its primary key is identical with that of existing data.
);

As a sink (for upsert streams)

CREATE TABLE `kudu_upsert_sink_table` (
`id` INT,
`name` STRING
) WITH (
-- Specify the options to connect Kudu.
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
'kudu.hash-columns' = 'id', -- The hash key (optional).
'kudu.primary-key-columns' = 'id', -- The primary key (required). When this connector is used as an upsert sink, a primary key is required.
);

WITH parameters

Option
Required
Default Value
Description
connector.type
Yes
None
For connection to a Kudu database, it must be 'kudu'.
kudu.masters
Yes
None
The URL of the Kudu database master server, with a default port of 7051. If the Kudu component provided by Tencent Cloud is used, you can find the master server IP and port this way: Log in to the EMR console, click ID/Name of the target cluster in the cluster list to go to its details page, and select Cluster services > Kudu > Operation > View port.
kudu.table
Yes
None
The name of the Kudu table. For example, a Kudu table created through Impala is generally named as impala::db_name.table_name, and one created with Java API as db_name.tablename.
kudu.hash-columns
No
None
The hash key.
kudu.primary-key-columns
No
None
The primary key.
kudu.replicas
No
None
The number of replicas.
kudu.operation-timeout
No
30000
The insert timeout period in ms.
kudu.max-buffer-size
No
1000
Default value: 1000.
kudu.flush-interval
No
1000
Default value: 1000.
kudu.ignore-not-found
No
false
Whether to ignore the data that is not found.
kudu.ignore-duplicate
No
false
Whether to ignore the data whose primary key is identical with that of existing data.

Data type mapping

Flink Type
Kudu Type
STRING
STRING
BOOLEAN
BOOL
TINYINT
INT8
SMALLINT
INT16
INT
INT32
BIGINT
INT64
FLOAT
FLOAT
DOUBLE
DOUBLE
BYTES
BINARY
TIMESTAMP(3)
UNIXTIME_MICROS

Example

CREATE TABLE `kudu_source_table` (
`id` INT,
`name` STRING
) WITH (
-- Specify the options to connect Kudu.
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
'kudu.hash-columns' = 'id', -- The hash key (optional).
'kudu.primary-key-columns' = 'id', -- The primary key (optional).
'kudu.operation-timeout' = '10000', -- The insert timeout period (optional).
'kudu.max-buffer-size' = '2000', -- The buffer size (optional).
'kudu.flush-interval' = '1000' -- The interval of data flush to Kudu (optional).
);

CREATE TABLE `kudu_upsert_sink_table` (
`id` INT,
`name` STRING
) WITH (
-- Specify the options to connect Kudu.
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
'kudu.hash-columns' = 'id', -- The hash key (optional).
'kudu.primary-key-columns' = 'id', -- The primary key (required). When this connector is used as an upsert sink, a primary key is required.
);

insert into kudu_upsert_sink_table select * from kudu_source_table;

Notes

1. To query a Kudu table in Impala, you need to first check whether an external table is created.
2. By default, a table created using a way other than Impala Shell has no external table in Impala, and you need to create an external table for this table to query its records.
3. When the Kudu connector is used as a Stream Compute Service sink, if the sink table does not exist in the Kudu database, an internal table will be created there.

Kudu authentication with Kerberos

1. Log in to the cluster master node to get the files krb5.conf and emr.keytab in the following paths.
/etc/krb5.conf
/var/krb5kdc/emr.keytab
2. Package the files into a JAR file.
jar cvf kudu-xxx.jar krb5.conf emr.keytab
3. Check the JAR structure (run the Vim command vim kudu-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
5. Get the Kerberos principal and configure it in advanced job parameters.
klist -kt /var/krb5kdc/emr.keytab

# The output is as shown below, and you can just use the first principal: hadoop/172.28.22.43@EMR-E4331BF2
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 07/06/2023 18:50:41 hadoop/172.28.22.43@EMR-E4331BF2
2 07/06/2023 18:50:41 HTTP/172.28.22.43@EMR-E4331BF2
2 07/06/2023 18:50:41 kudu/172.28.22.43@EMR-E4331BF2
6. Configure the principle in advanced job parameters.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.22.43@EMR-E4331BF2
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
fs.hdfs.hadoop.security.authentication: kerberos
Note
Kudu authentication with Kerberos may be unavailable to some existing Stream Compute Service clusters. To use this feature, please contact us to upgrade the cluster management service.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon