Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported (use as source and sink) |
1.14 | Unsupported |
1.16 | Unsupported |
CREATE TABLE hudi_sink(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)) WITH ('connector' = 'hudi', 'path' = 'hdfs://HDFS1000/data/hudi/mor', 'table.type' = 'MERGE_ON_READ' -- The MERGE_ON_READ table, which defaults to `COPY_ON_WRITE`., 'write.tasks' = '3' -- Default value: 4., 'compaction.tasks' = '4' -- Default value: 4.-- , 'hive_sync.enable' = 'true' -- Default value: false.-- , 'hive_sync.db' = 'default'-- , 'hive_sync.table' = 'datagen_mor_1'-- , 'hive_sync.mode' = 'jdbc'-- , 'hive_sync.username' = ''-- , 'hive_sync.password' = ''-- , 'hive_sync.jdbc_url' = 'jdbc:hive2://172.28.1.185:7001'-- , 'hive_sync.metastore.uris' = 'thrift://172.28.1.185:7004');
CREATE TABLE `source`(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)) WITH ('connector' = 'hudi', 'path' = 'hdfs://172.28.28.202:4007/path/hudidata', 'table.type' = 'MERGE_ON_READ' -- The MOR table. Its incremental data cannot be read., 'read.tasks' = '1' -- The parallelism of the read tasks, which defaults to 4., 'hoodie.datasource.query.type' = 'snapshot' -- Default value: snapshot; other valid values: read_optimized and incremental., 'read.streaming.enabled' = 'true' -- This option enables the streaming read., 'read.start-commit' = 'earliest' -- Specifies the start commit instant time; the commit time format should be 'yyyyMMddHHmmss'., 'read.streaming.check-interval' = '4');
Option | Required | Default Value | Description |
connector | Yes | None | Here, it should be hudi . |
path | Yes | None | The data storage path, in the format of hdfs:// for data storage in HDFS and COSN://$bucket/$path for data storage in COS. |
Option | Required | Default Value | Description |
table.type | No | COPY_ON_WRITE | The Hudi table type. Valid values: COPY_ON_WRITE and MERGE_ON_READ . |
HoodieRecord
parametersOption | Required | Default Value | Description |
hoodie.datasource.write.recordkey.field | No | uuid | The key field. If the Flink table has a primary key, use it. |
hoodie.datasource.write.partitionpath.field | No | "" | The partition path field. Null indicates the table is not partitioned. |
write.precombine.field | No | ts | Field used in pre-combining before actual write. When two records have the same key value, the one with the larger value will be picked for the pre-combine field, determined by Object.compareTo(..). |
Option | Required | Default Value | Description |
write.tasks | No | 4 | The parallelism of tasks that do actual write. |
write.index_bootstrap.tasks | No | None | The parallelism of tasks that do index bootstrap, which defaults to the parallelism of the execution environment. |
write.bucket_assign.tasks | No | None | The parallelism of tasks that do bucket assign, which defaults to the parallelism of the execution environment. |
compaction.tasks | No | 4 | The parallelism of tasks that do actual compaction. |
Option | Required | Default Value | Description |
compaction.schedule.enabled | No | true | Whether to enable compaction. |
compaction.async.enabled | No | true | Whether to use async compaction. |
compaction.trigger.strategy | No | num_commits | num_commits / time_elapsed / num_and_time / num_or_time |
Option | Required | Default Value | Description |
hive_sync.enable | No | false | - |
hive_sync.db | No | - | - |
hive_sync.table | No | - | - |
hive_sync.mode | No | jdbc | Valid values: hms , jdbc , and hiveql . |
hive_sync.username | No | - | - |
hive_sync.password | No | - | - |
hive_sync.jdbc_url | No | - | - |
Option | Required | Default Value | Description |
read.tasks | No | 4 | The parallelism of tasks that do actual read. |
hoodie.datasource.query.type | No | snapshot | Valid values: snapshot , read_optimized , and incremental . |
read.streaming.enabled | No | false | - |
read.streaming.check-interval | No | 60 | The check interval for streaming read in seconds. |
read.streaming.skip_compaction | No | false | - |
read.start-commit | No | None | The start commit instant for reading in the format of 'yyyyMMddHHmmss'. It can be set to earliest for reading from the earliest instant for streaming read. |
read.end-commit | No | None | The end commit instant for reading, which does not need to be specifically set. |
path
to the respective cosn path.hive-site.xml
and hdfs-site.xml
from the following paths in the EMR Hive cluster./usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
jar -cvf hdfs-xxx.jar hdfs-site.xml
vi hdfs-xxx.jar
META-INF/META-INF/MANIFEST.MFhdfs-site.xml
hadoop
.containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
krb5.conf
, emr.keytab
, core-site.xml
, and hdfs-site.xml
in the following paths./etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
vim hdfs-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.confhdfs-site.xmlcore-site.xml
klist -kt /var/krb5kdc/emr.keytab# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.conf
java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
Was this page helpful?