Flink 版本 | 说明 |
1.11 | 支持 hive 版本 1.1.0、2.3.2、2.3.5、3.1.1 配置项 'connector.type' = 'hive' |
1.13 | 支持 hive 版本 1.0.0 ~ 1.2.2、2.0.0 ~ 2.2.0、2.3.0 ~ 2.3.6、3.0.0 ~ 3.1.2 配置项 'connector' = 'hive' |
1.14 | 不支持 |
1.16 | 支持 hive 版本 2.0.0 ~ 2.2.0、2.3.0 ~ 2.3.6、3.0.0 ~ 3.1.2 配置项 'connector' = 'hive' |
CREATE TABLE hive_table (`id` INT,`name` STRING,`dt` STRING,`hr` STRING) PARTITIONED BY (dt, hr)with ('connector' = 'hive', -- Flink 1.13 请使用 'connector' = 'hive''hive-version' = '3.1.1','hive-database' = 'testdb','partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');
# 在 Hive 的 testdb 数据库创建 hive_table 数据表USE testdb;CREATE TABLE `hive_table` (`id` int,`name` string)PARTITIONED BY (`dt` string, `hr` string)STORED AS ORC;
hdfs dfs -chmod 777 /usr/hive/warehouse/testdb.db/hive_table
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
参数值 | 必填 | 默认值 | 描述 |
connector.type | 是 | 无 | Flink-1.11支持,填 'hive' 选择使用 hive connector。 |
connector | 是 | 无 | Flink-1.13支持,填 'hive' 选择使用 hive connector。 |
hive-version | 是 | 无 | EMR 创建的 Hive 集群对应的版本。 |
hive-database | 是 | 无 | 数据要写入的 Hive database。 |
hive-table | 否 | 无 | Flink-1.13支持,填写后该值会作为Hive库的对应表名 |
sink.partition-commit.trigger | 否 | process-time | 分区关闭策略。可选值包括: process-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间为分区创建时的物理时间。 partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。partition-time 依赖于 watermark 生成,需要配合 wartermark 才能支持自动分区发现。当 watermark 时间超过了 从分区抽取的时间 与 delay 参数配置时间 之和后会提交分区。 |
sink.partition-commit.delay | 否 | 0s | 分区关闭延迟。当分区在创建超过一定时间之后将被关闭。 |
sink.partition-commit.policy.kind | 是 | 无 | 用于提交分区的策略。可选值可以组合使用,可选值包括: success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。 metastore:向 Hive Metastore 更新分区信息。 custom:用户实现的自定义分区提交策略。 |
partition.time-extractor.timestamp-pattern | 否 | 无 | 分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。 如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。 如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。 如果时间戳应该从两个分区字段 'dt' 和 'hour' 提取,可以配置 '$dt $hour:00:00'。 |
sink.partition-commit.policy.class | 否 | 无 | 分区提交类,配合 sink.partition-commit.policy.kind = 'custom' 使用,类必须实现 PartitionCommitPolicy。 |
partition.time-extractor.kind | 否 | default | 分区时间抽取方式。这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom。 |
partition.time-extractor.class | 否 | 无 | 分区时间抽取类,这个类必须实现 PartitionTimeExtractor 接口。 |
CREATE TABLE datagen_source_table (id INT,name STRING,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) WITH ('connector' = 'datagen','rows-per-second' = '10');CREATE TABLE hive_table (`id` INT,`name` STRING,`dt` STRING,`hr` STRING) PARTITIONED BY (dt, hr)with ('connector' = 'hive', -- Flink 1.13 请使用 'connector' = 'hive''hive-version' = '3.1.1','hive-database' = 'testdb','partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');-- streaming sql, insert into hive tableINSERT INTO hive_tableSELECT id, name, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')FROM datagen_source_table;
/usr/local/service/hive/conf/hive-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
在hive-site增加如下配置,ip的值取配置文件里 hive.server2.thrift.bind.host 的 value<property><name>hive.metastore.uris</name><value>thrift://ip:7004</value></property>
jar -cvf hive-xxx.jar hive-site.xml hdfs-site.xml hivemetastore-site.xml hiveserver2-site.xml
vi hive-xxx.jar
),jar 里面包含如下信息,请确保文件不缺失且结构正确。META-INF/META-INF/MANIFEST.MFhive-site.xmlhdfs-site.xmlhivemetastore-site.xmlhiveserver2-site.xml
/etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml/usr/local/service/hive/conf/hive-site.xml
hive.server2.thrift.bind.host
的 value。<property><name>hive.metastore.uris</name><value>thrift://ip:7004</value></property>
jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.confhdfs-site.xmlcore-site.xmlhive-site.xmlhivemetastore-site.xmlhiveserver2-site.xml
klist -kt /var/krb5kdc/emr.keytab# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: ${krb5.conf.fileName}
msck repair table hive_table_xxx;
本页内容是否解决了您的问题?