tencent cloud

All product documents
Stream Compute Service
Last updated: 2023-11-08 15:56:45
Hudi
Last updated: 2023-11-08 15:56:45

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported (use as source and sink)
1.14
Unsupported
1.16
Unsupported

Use cases

This connector can be used as a source or a sink.

Defining a table in DDL

As a sink:
CREATE TABLE hudi_sink
(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
) WITH (
'connector' = 'hudi'
, 'path' = 'hdfs://HDFS1000/data/hudi/mor'
, 'table.type' = 'MERGE_ON_READ' -- The MERGE_ON_READ table, which defaults to `COPY_ON_WRITE`.
, 'write.tasks' = '3' -- Default value: 4.
, 'compaction.tasks' = '4' -- Default value: 4.
-- , 'hive_sync.enable' = 'true' -- Default value: false.
-- , 'hive_sync.db' = 'default'
-- , 'hive_sync.table' = 'datagen_mor_1'
-- , 'hive_sync.mode' = 'jdbc'
-- , 'hive_sync.username' = ''
-- , 'hive_sync.password' = ''
-- , 'hive_sync.jdbc_url' = 'jdbc:hive2://172.28.1.185:7001'
-- , 'hive_sync.metastore.uris' = 'thrift://172.28.1.185:7004'
);

As a source:
CREATE TABLE `source`
(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
) WITH (
'connector' = 'hudi'
, 'path' = 'hdfs://172.28.28.202:4007/path/hudidata'
, 'table.type' = 'MERGE_ON_READ' -- The MOR table. Its incremental data cannot be read.
, 'read.tasks' = '1' -- The parallelism of the read tasks, which defaults to 4.
, 'hoodie.datasource.query.type' = 'snapshot' -- Default value: snapshot; other valid values: read_optimized and incremental.
, 'read.streaming.enabled' = 'true' -- This option enables the streaming read.
, 'read.start-commit' = 'earliest' -- Specifies the start commit instant time; the commit time format should be 'yyyyMMddHHmmss'.
, 'read.streaming.check-interval' = '4'
);

WITH parameters

Common parameters

Option
Required
Default Value
Description
connector
Yes
None
Here, it should be hudi.
path
Yes
None
The data storage path, in the format of hdfs:// for data storage in HDFS and COSN://$bucket/$path for data storage in COS.

Parameters when as a sink

Option
Required
Default Value
Description
table.type
No
COPY_ON_WRITE
The Hudi table type. Valid values: COPY_ON_WRITE and MERGE_ON_READ.

HoodieRecord parameters

Option
Required
Default Value
Description
hoodie.datasource.write.recordkey.field
No
uuid
The key field. If the Flink table has a primary key, use it.
hoodie.datasource.write.partitionpath.field
No
""
The partition path field. Null indicates the table is not partitioned.
write.precombine.field
No
ts
Field used in pre-combining before actual write. When two records have the same key value, the one with the larger value will be picked for the pre-combine field, determined by Object.compareTo(..).

Parallelism parameters

Option
Required
Default Value
Description
write.tasks
No
4
The parallelism of tasks that do actual write.
write.index_bootstrap.tasks
No
None
The parallelism of tasks that do index bootstrap, which defaults to the parallelism of the execution environment.
write.bucket_assign.tasks
No
None
The parallelism of tasks that do bucket assign, which defaults to the parallelism of the execution environment.
compaction.tasks
No
4
The parallelism of tasks that do actual compaction.

Compaction parameters

Option
Required
Default Value
Description
compaction.schedule.enabled
No
true
Whether to enable compaction.
compaction.async.enabled
No
true
Whether to use async compaction.
compaction.trigger.strategy
No
num_commits
num_commits / time_elapsed / num_and_time / num_or_time

Hive metadata sync parameters

Option
Required
Default Value
Description
hive_sync.enable
No
false
-
hive_sync.db
No
-
-
hive_sync.table
No
-
-
hive_sync.mode
No
jdbc
Valid values: hms, jdbc, and hiveql.
hive_sync.username
No
-
-
hive_sync.password
No
-
-
hive_sync.jdbc_url
No
-
-

More parameters

For more parameters, see Flink Options.

Parameters when as a source

Option
Required
Default Value
Description
read.tasks
No
4
The parallelism of tasks that do actual read.
hoodie.datasource.query.type
No
snapshot
Valid values: snapshot, read_optimized, and incremental.
read.streaming.enabled
No
false
-
read.streaming.check-interval
No
60
The check interval for streaming read in seconds.
read.streaming.skip_compaction
No
false
-
read.start-commit
No
None
The start commit instant for reading in the format of 'yyyyMMddHHmmss'. It can be set to earliest for reading from the earliest instant for streaming read.
read.end-commit
No
None
The end commit instant for reading, which does not need to be specifically set.

More parameters

For more parameters, see Flink Options.

COS configurations

No additional configurations are required. You just need to set path to the respective cosn path.

HDFS configurations

Getting the HDFS JAR package

To write data to Hudi in a Flink SQL task, if the data is stored in HDFS, a JAR package containing HDFS configurations is required to connect Flink to the target HDFS cluster. The steps to get the JAR package and to use it are as follows:
1. Log in to the respective Hive cluster using SSH.
2. Get hive-site.xml and hdfs-site.xml from the following paths in the EMR Hive cluster.
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
3. Package the obtained configuration files in a JAR package.
jar -cvf hdfs-xxx.jar hdfs-site.xml
4. Check the JAR structure (you can run a Vim command to view it). Make sure the JAR file includes the following information and has the correct structure.
vi hdfs-xxx.jar
META-INF/
META-INF/MANIFEST.MF
hdfs-site.xml

Setting the HDFS user

Note
By default, Flink jobs access HDFS with a Flink user. If the Flink user does not have permission to write to HDFS, you can use advanced job parameters to set the accessing user to a user that has write permission or to the super-user hadoop.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop

Kerberos authentication

1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, and hdfs-site.xml in the following paths.
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
2. Package the obtained configuration files in a JAR package.
jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
3. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
hdfs-site.xml
core-site.xml
4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
5. Get the Kerberos principal and configure it in advanced job parameters.
klist -kt /var/krb5kdc/emr.keytab

# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. Configure the principal in advanced job parameters.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf

FAQs

Hive sync

The Hive table sync failed, with an error raised.
java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
Check whether the Hive environment contains the JAR package required for Hudi. For details, see Hive.
hudi-hadoop-mr-bundle-x.y.z.jar Download.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support