tencent cloud

Last updated: 2023-11-08 14:50:37
Hive
Last updated: 2023-11-08 14:50:37

Overview

The Hive connector can be used as a sink of streams, but it only supports append streams, but not upsert streams. Supported data formats include Text, SequenceFile, ORC, and Parquet.

Versions

Flink Version
Description
1.11
Hive v1.1.0, v2.3.2, v2.3.5, and v3.1.1 supported
Option 'connector.type' = 'hive'
1.13
Hive v1.0.0 - v1.2.2, v2.0.0 - v2.2.0, v2.3.0 - v2.3.6, and v3.0.0 - 3.1.2 supported
Option 'connector.type' = 'hive'
1.14
Unsupported
1.16
Hive v2.0.0 - v2.2.0, v2.3.0 - v2.3.6, and v3.0.0 - 3.1.2 supported
Option 'connector.type' = 'hive'

Defining a table in DDL

As a sink

CREATE TABLE hive_table (
`id` INT,
`name` STRING,
`dt` STRING,
`hr` STRING
) PARTITIONED BY (dt, hr)
with (
'connector' = 'hive', -- For Flink v1.13, set 'connector' to 'hive'.
'hive-version' = '3.1.1',
'hive-database' = 'testdb',
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);

Job configurations

Create a Hive table in a Hive database.
# Create the table "hive_table" in the Hive database "testdb".
USE testdb;
CREATE TABLE `hive_table` (
`id` int,
`name` string)
PARTITIONED BY (`dt` string, `hr` string)
STORED AS ORC;
Grant the write access to the HDFS path of the Hive table with one of the following methods.
Method 1: Log in to the EMR Hive cluster as instructed in Basic Hive Operations, and execute the chmod command on hive_table table of the target database testdb.
hdfs dfs -chmod 777 /usr/hive/warehouse/testdb.db/hive_table
Method 2: Go to Jobs > Job parameters of the target job in the console, and add the following advanced parameters to gain access to the HDFS path using the Hadoop user role.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
Note
The Hive table used in the Flink SQL statements is testdb.hive_table. Here in the CREATE TABLE statements, the table name used is that in the Hive database (Flink v1.13 supports using the value of the hive-table option to overwrite this value), and the database name is specified using the hive-database option.

WITH parameters

Option
Required
Default Value
Description
connector.type
Yes
None
Available to Flink v1.11. To use the Hive connector, set it to hive.
connector
Yes
None
Available to Flink v1.11. To use the Hive connector, set it to hive.
hive-version
Yes
None
The version of the Hive cluster created in the EMR console.
hive-database
Yes
None
The Hive database to write data to.
hive-table
No
None
Available to Flink v1.13. Its value is used as the table name of the Hive database.
sink.partition-commit.trigger
No
process-time
The partition commit trigger. Valid values:
process-time: A partition will be committed when a certain time elapses after the partition creation time. The creation time refers to the physical creation time.
partition-time: A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is extracted from partition values. partition-time requires watermark generation to automatically detect partitions. A partition is committed once watermark passes the time extracted from partition values plus delay.
sink.partition-commit.delay
No
0s
The time to wait before closing a partition. A partition will not be committed until the specified time elapses after its creation time.
sink.partition-commit.policy.kind
Yes
None
The partition committing policy. Valid values (combinations are allowed):
success-file: When a partition is committed, a _success file will be generated in the partition's directory.
metastore: Add the partition to the Hive Metastore.
custom: A user-defined partition committing policy.
partition.time-extractor.timestamp-pattern
No
None
The partition timestamp extraction format. The timestamps should be yyyy-mm-dd hh:mm:ss, with the placeholders replaced with the corresponding partition fields in the Hive table. By default, yyyy-mm-dd hh:mm:ss is extracted from the first field.
If timestamps are extracted from a single partition field dt, you can set this to $dt.
If timestamps are extracted from multiple partition fields, such as year, month, day, and hour, you can set this to $year-$month-$day $hour:00:00.
If timestamps are extracted from two partition fields dt and hour, you can set this to $dt $hour:00:00.
sink.partition-commit.policy.class
No
None
The partition committing policy class, which needs to be used together with sink.partition-commit.policy.kind = 'custom' and must implement PartitionCommitPolicy.
partition.time-extractor.kind
No
default
The partition time extractor, which applies only when sink.partition-commit.trigger is set to partition-time. If you have your own time extractor, set this to custom.
partition.time-extractor.class
No
None
The partition time extractor class. This class should implement the PartitionTimeExtractor API.

Example

CREATE TABLE datagen_source_table (
id INT,
name STRING,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);

CREATE TABLE hive_table (
`id` INT,
`name` STRING,
`dt` STRING,
`hr` STRING
) PARTITIONED BY (dt, hr)
with (
'connector' = 'hive', -- For Flink v1.13, set 'connector' to 'hive'.
'hive-version' = '3.1.1',
'hive-database' = 'testdb',
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- streaming sql, insert into hive table
INSERT INTO hive_table
SELECT id, name, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM datagen_source_table;

Hive configurations

Getting the Hive connection JAR package

To write data to Hive in a Flink SQL task, a JAR package containing Hive and HDFS configurations is required to connect Flink to the target Hive cluster. The steps to get the JAR package and to use it are as follows:
1. Log in to the respective Hive cluster using SSH.
2. Get hive-site.xml and hdfs-site.xml from the following paths in the EMR Hive cluster.
/usr/local/service/hive/conf/hive-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
3. Modifies hive-site.xml.
Add the following in "hive-site", and set "ip" to the value of "hive.server2.thrift.bind.host"
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:7004</value>
</property>
5. Package the obtained configuration files in a JAR package.
jar -cvf hive-xxx.jar hive-site.xml hdfs-site.xml hivemetastore-site.xml hiveserver2-site.xml
6. Check the JAR structure (run the Vi command vi hive-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
hive-site.xml
hdfs-site.xml
hivemetastore-site.xml
hiveserver2-site.xml

Using a JAR package in a task

Select the Hive connection JAR package as the referenced package. This JAR package is hive-xxx.jar obtained in Getting the Hive connection JAR package and must be uploaded in Dependencies before use.

Kerberos authentication

1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, hdfs-site.xml, and hive-site.xml in the following paths.
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
/usr/local/service/hive/conf/hive-site.xml
2. ‌Modify hive-site.xml. Add the following in "hive-site", and set "ip" to the value of "hive.server2.thrift.bind.host".
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:7004</value>
</property>
4. Package the obtained configuration files in a JAR package.
jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
5. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
hdfs-site.xml
core-site.xml
hive-site.xml
hivemetastore-site.xml
hiveserver2-site.xml
6. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
7. Get Kerberos principals to configure advanced job parameters.
klist -kt /var/krb5kdc/emr.keytab

# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: ${krb5.conf.fileName}
Note
Kerberos authentication is not supported for historical Stream Compute Service clusters. To support the feature, please contact us to upgrade the cluster management service.

Notes

If the Flink job runs properly and no errors are reported in the logs, but this Hive table cannot be found in the client, fix the table with the following command (replace hive_table_xxx with the name of the table to be fixed).
msck repair table hive_table_xxx;

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback