Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Supported |
CREATE TABLE `kudu_source_table` (`id` INT,`name` STRING) WITH (-- Specify the options to connect Kudu.'connector' = 'kudu','kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".'kudu.hash-columns' = 'id', -- The hash key (optional).'kudu.primary-key-columns' = 'id', -- The primary key (optional).'kudu.operation-timeout' = '10000', -- The insert timeout period (optional).'kudu.max-buffer-size' = '2000', -- The buffer size (optional).'kudu.flush-interval' = '1000' -- The interval of data flush to Kudu (optional).);
CREATE TABLE `kudu_sink_table` (`id` INT,`name` STRING) WITH (-- Specify the options to connect Kudu.'connector' = 'kudu','kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".'kudu.igonre-duplicate' = 'true' -- (Optional) If this option is set to `true`, the data will be ignored if its primary key is identical with that of existing data.);
CREATE TABLE `kudu_upsert_sink_table` (`id` INT,`name` STRING) WITH (-- Specify the options to connect Kudu.'connector' = 'kudu','kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".'kudu.hash-columns' = 'id', -- The hash key (optional).'kudu.primary-key-columns' = 'id', -- The primary key (required). When this connector is used as an upsert sink, a primary key is required.);
Option | Required | Default Value | Description |
connector.type | Yes | None | For connection to a Kudu database, it must be 'kudu' . |
kudu.masters | Yes | None | The URL of the Kudu database master server, with a default port of 7051. If the Kudu component provided by Tencent Cloud is used, you can find the master server IP and port this way: Log in to the EMR console, click ID/Name of the target cluster in the cluster list to go to its details page, and select Cluster services > Kudu > Operation > View port. |
kudu.table | Yes | None | The name of the Kudu table. For example, a Kudu table created through Impala is generally named as impala::db_name.table_name , and one created with Java API as db_name.tablename . |
kudu.hash-columns | No | None | The hash key. |
kudu.primary-key-columns | No | None | The primary key. |
kudu.replicas | No | None | The number of replicas. |
kudu.operation-timeout | No | 30000 | The insert timeout period in ms. |
kudu.max-buffer-size | No | 1000 | Default value: 1000 . |
kudu.flush-interval | No | 1000 | Default value: 1000 . |
kudu.ignore-not-found | No | false | Whether to ignore the data that is not found. |
kudu.ignore-duplicate | No | false | Whether to ignore the data whose primary key is identical with that of existing data. |
Flink Type | Kudu Type |
STRING | STRING |
BOOLEAN | BOOL |
TINYINT | INT8 |
SMALLINT | INT16 |
INT | INT32 |
BIGINT | INT64 |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BYTES | BINARY |
TIMESTAMP(3) | UNIXTIME_MICROS |
CREATE TABLE `kudu_source_table` (`id` INT,`name` STRING) WITH (-- Specify the options to connect Kudu.'connector' = 'kudu','kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".'kudu.hash-columns' = 'id', -- The hash key (optional).'kudu.primary-key-columns' = 'id', -- The primary key (optional).'kudu.operation-timeout' = '10000', -- The insert timeout period (optional).'kudu.max-buffer-size' = '2000', -- The buffer size (optional).'kudu.flush-interval' = '1000' -- The interval of data flush to Kudu (optional).);CREATE TABLE `kudu_upsert_sink_table` (`id` INT,`name` STRING) WITH (-- Specify the options to connect Kudu.'connector' = 'kudu','kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".'kudu.hash-columns' = 'id', -- The hash key (optional).'kudu.primary-key-columns' = 'id', -- The primary key (required). When this connector is used as an upsert sink, a primary key is required.);insert into kudu_upsert_sink_table select * from kudu_source_table;
krb5.conf
and emr.keytab
in the following paths./etc/krb5.conf/var/krb5kdc/emr.keytab
jar cvf kudu-xxx.jar krb5.conf emr.keytab
vim kudu-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.conf
klist -kt /var/krb5kdc/emr.keytab# The output is as shown below, and you can just use the first principal: hadoop/172.28.22.43@EMR-E4331BF2KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 07/06/2023 18:50:41 hadoop/172.28.22.43@EMR-E4331BF22 07/06/2023 18:50:41 HTTP/172.28.22.43@EMR-E4331BF22 07/06/2023 18:50:41 kudu/172.28.22.43@EMR-E4331BF2
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.22.43@EMR-E4331BF2security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.conffs.hdfs.hadoop.security.authentication: kerberos
Was this page helpful?