Flink Version | Description |
1.11 | Hive v1.1.0, v2.3.2, v2.3.5, and v3.1.1 supported Option 'connector.type' = 'hive' |
1.13 | Hive v1.0.0 - v1.2.2, v2.0.0 - v2.2.0, v2.3.0 - v2.3.6, and v3.0.0 - 3.1.2 supported Option 'connector.type' = 'hive' |
1.14 | Unsupported |
1.16 | Hive v2.0.0 - v2.2.0, v2.3.0 - v2.3.6, and v3.0.0 - 3.1.2 supported Option 'connector.type' = 'hive' |
CREATE TABLE hive_table (`id` INT,`name` STRING,`dt` STRING,`hr` STRING) PARTITIONED BY (dt, hr)with ('connector' = 'hive', -- For Flink v1.13, set 'connector' to 'hive'.'hive-version' = '3.1.1','hive-database' = 'testdb','partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');
# Create the table "hive_table" in the Hive database "testdb".USE testdb;CREATE TABLE `hive_table` (`id` int,`name` string)PARTITIONED BY (`dt` string, `hr` string)STORED AS ORC;
hive_table table
of the target database testdb
.hdfs dfs -chmod 777 /usr/hive/warehouse/testdb.db/hive_table
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
testdb.hive_table
. Here in the CREATE TABLE statements, the table name used is that in the Hive database (Flink v1.13 supports using the value of the hive-table
option to overwrite this value), and the database name is specified using the hive-database
option.Option | Required | Default Value | Description |
connector.type | Yes | None | Available to Flink v1.11. To use the Hive connector, set it to hive . |
connector | Yes | None | Available to Flink v1.11. To use the Hive connector, set it to hive . |
hive-version | Yes | None | The version of the Hive cluster created in the EMR console. |
hive-database | Yes | None | The Hive database to write data to. |
hive-table | No | None | Available to Flink v1.13. Its value is used as the table name of the Hive database. |
sink.partition-commit.trigger | No | process-time | The partition commit trigger. Valid values: process-time : A partition will be committed when a certain time elapses after the partition creation time. The creation time refers to the physical creation time.partition-time : A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is extracted from partition values. partition-time requires watermark generation to automatically detect partitions. A partition is committed once watermark passes the time extracted from partition values plus delay . |
sink.partition-commit.delay | No | 0s | The time to wait before closing a partition. A partition will not be committed until the specified time elapses after its creation time. |
sink.partition-commit.policy.kind | Yes | None | The partition committing policy. Valid values (combinations are allowed): success-file : When a partition is committed, a _success file will be generated in the partition's directory.metastore : Add the partition to the Hive Metastore.custom : A user-defined partition committing policy. |
partition.time-extractor.timestamp-pattern | No | None | The partition timestamp extraction format. The timestamps should be yyyy-mm-dd hh:mm:ss , with the placeholders replaced with the corresponding partition fields in the Hive table. By default, yyyy-mm-dd hh:mm:ss is extracted from the first field.If timestamps are extracted from a single partition field dt , you can set this to $dt .If timestamps are extracted from multiple partition fields, such as year , month , day , and hour , you can set this to $year-$month-$day $hour:00:00 .If timestamps are extracted from two partition fields dt and hour , you can set this to $dt $hour:00:00 . |
sink.partition-commit.policy.class | No | None | The partition committing policy class, which needs to be used together with sink.partition-commit.policy.kind = 'custom' and must implement PartitionCommitPolicy . |
partition.time-extractor.kind | No | default | The partition time extractor, which applies only when sink.partition-commit.trigger is set to partition-time . If you have your own time extractor, set this to custom . |
partition.time-extractor.class | No | None | The partition time extractor class. This class should implement the PartitionTimeExtractor API. |
CREATE TABLE datagen_source_table (id INT,name STRING,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) WITH ('connector' = 'datagen','rows-per-second' = '10');CREATE TABLE hive_table (`id` INT,`name` STRING,`dt` STRING,`hr` STRING) PARTITIONED BY (dt, hr)with ('connector' = 'hive', -- For Flink v1.13, set 'connector' to 'hive'.'hive-version' = '3.1.1','hive-database' = 'testdb','partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');-- streaming sql, insert into hive tableINSERT INTO hive_tableSELECT id, name, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')FROM datagen_source_table;
hive-site.xml
and hdfs-site.xml
from the following paths in the EMR Hive cluster./usr/local/service/hive/conf/hive-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
hive-site.xml
.Add the following in "hive-site", and set "ip" to the value of "hive.server2.thrift.bind.host"<property><name>hive.metastore.uris</name><value>thrift://ip:7004</value></property>
jar -cvf hive-xxx.jar hive-site.xml hdfs-site.xml hivemetastore-site.xml hiveserver2-site.xml
vi hive-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFhive-site.xmlhdfs-site.xmlhivemetastore-site.xmlhiveserver2-site.xml
hive-xxx.jar
obtained in Getting the Hive connection JAR package and must be uploaded in Dependencies before use.krb5.conf
, emr.keytab
, core-site.xml
, hdfs-site.xml
, and hive-site.xml
in the following paths./etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml/usr/local/service/hive/conf/hive-site.xml
hive-site.xml
. Add the following in "hive-site", and set "ip" to the value of "hive.server2.thrift.bind.host".<property><name>hive.metastore.uris</name><value>thrift://ip:7004</value></property>
jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
vim hdfs-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.confhdfs-site.xmlcore-site.xmlhive-site.xmlhivemetastore-site.xmlhiveserver2-site.xml
klist -kt /var/krb5kdc/emr.keytab# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: ${krb5.conf.fileName}
hive_table_xxx
with the name of the table to be fixed).msck repair table hive_table_xxx;
Was this page helpful?