Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持,支持常见的 lzo、snappy 压缩算法 |
1.14 | 支持写入到 HDFS,不支持 lzo、snappy 压缩算法 |
1.16 | 支持写入到 HDFS,不支持 lzo、snappy 压缩算法 |
CREATE TABLE `hdfs_sink_table` (`id` INT,`name` STRING,`part1` INT,`part2` INT) PARTITIONED BY (part1, part2) WITH ('connector' = 'filesystem','path' = 'hdfs://HDFS10000/data/', -- cosn://${buketName}/path/to/store/data'format' = 'json','sink.rolling-policy.file-size' = '1M','sink.rolling-policy.rollover-interval' = '10 min','sink.partition-commit.delay' = '1 s','sink.partition-commit.policy.kind' = 'success-file');
参数值 | 必填 | 默认值 | 描述 |
path | 是 | 无 | 文件写入的路径。 |
sink.rolling-policy.file-size | 否 | 128MB | 文件最大大小。当当前写入的文件大小达到设置的阈值时,当前写入的文件将被关闭,并打开一个新的文件进行写入。 |
sink.rolling-policy.rollover-interval | 否 | 30min | 文件最大持续写入时间。当当前写入的文件写入的时间超过了设置的阈值时,当前写入的文件将被关闭,并打开一个新的文件进行写入。 |
sink.rolling-policy.check-interval | 否 | 1min | 文件检查间隔。FileSystem 按照这个间隔检查文件的写入时间是否已经满足了关闭条件,并将满足条件的文件进行关闭。 |
sink.partition-commit.trigger | 否 | process-time | 分区关闭策略。可选值包括: process-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间为分区创建时的物理时间。 partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。partition-time 依赖于 watermark 生成,需要配合 watermark 才能支持自动分区发现。当 watermark 时间超过了 从分区抽取的时间 与 delay 参数配置时间 之和后会提交分区。 |
sink.partition-commit.delay | 否 | 0s | 分区关闭延迟。当分区在创建超过一定时间之后将被关闭。 |
partition.time-extractor.kind | 否 | default | 分区时间抽取方式。这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom。 |
partition.time-extractor.class | 否 | 无 | 分区时间抽取类,这个类必须实现 PartitionTimeExtractor 接口。 |
partition.time-extractor.timestamp-pattern | 否 | 无 | 分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。 如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。 如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。 如果时间戳应该从两个分区字段 'dt' 和 'hour' 提取,可以配置 '$dt $hour:00:00'。 |
sink.partition-commit.policy.kind | 是 | 无 | 用于提交分区的策略。可选值包括: success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。 custom:用户实现的自定义分区提交策略。 |
sink.partition-commit.policy.class | 否 | 无 | 分区提交类,这个类必须实现 PartitionCommitPolicy。 |
hdfs://${dfs.nameserivces}/${path}
,${dfs.nameserivces}
的值可在 hdfs-site.xml 中查找,${path}
为要写入的数据目录。fs.hdfs.dfs.nameservices: HDFS12345fs.hdfs.dfs.ha.namenodes.HDFS12345: nn2,nn1fs.hdfs.dfs.namenode.http-address.HDFS12345.nn1: 172.27.2.57:4008fs.hdfs.dfs.namenode.https-address.HDFS12345.nn1: 172.27.2.57:4009fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn1: 172.27.2.57:4007fs.hdfs.dfs.namenode.http-address.HDFS12345.nn2: 172.27.1.218:4008fs.hdfs.dfs.namenode.https-address.HDFS12345.nn2: 172.27.1.218:4009fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn2: 172.27.1.218:4007fs.hdfs.dfs.client.failover.proxy.provider.HDFS12345: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: COS 所在的地域fs.cosn.userinfo.appid: COS 所属用户的 appid
fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterfs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterfs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/fs.cosn.trsf.fs.ofs.user.appid: COS 所属用户的 appidfs.cosn.trsf.fs.ofs.bucket.region: COS 所在的地域fs.cosn.trsf.fs.ofs.upload.flush.flag: truecontainerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopfs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: COS 所在的地域fs.cosn.userinfo.appid: COS 所属用户的 appid
fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterfs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterfs.ofs.tmp.cache.dir: /tmp/chdfs/fs.ofs.upload.flush.flag: truefs.ofs.user.appid: CHDFS 所属用户的 appidfs.ofs.bucket.region: CHDFS 所在的地域containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
/etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.confhdfs-site.xmlcore-site.xml
klist -kt /var/krb5kdc/emr.keytab# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.conf
fs.hdfs.dfs.nameservices: HDFS17995fs.hdfs.dfs.ha.namenodes.HDFS17995: nn2,nn1fs.hdfs.dfs.namenode.http-address.HDFS17995.nn1: 172.28.28.214:4008fs.hdfs.dfs.namenode.https-address.HDFS17995.nn1: 172.28.28.214:4009fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn1: 172.28.28.214:4007fs.hdfs.dfs.namenode.http-address.HDFS17995.nn2: 172.28.28.224:4008fs.hdfs.dfs.namenode.https-address.HDFS17995.nn2: 172.28.28.224:4009fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn2: 172.28.28.224:4007fs.hdfs.dfs.client.failover.proxy.provider.HDFS17995: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProviderfs.hdfs.hadoop.security.authentication: kerberos
CREATE TABLE datagen_source_table ( id INT, name STRING, part1 INT, part2 INT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- 每秒产生的数据条数 'fields.part1.min'='1', 'fields.part1.max'='2', 'fields.part2.min'='1', 'fields.part2.max'='2' );CREATE TABLEhdfs_sink_table
( id INT, name STRING, part1 INT, part2 INT ) PARTITIONED BY (part1, part2) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://HDFS10000/data/', 'format' = 'json', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'sink.partition-commit.delay' = '1 s', 'sink.partition-commit.policy.kind' = 'success-file' );INSERT INTOhdfs_sink_table
SELECT id, name, part1, part2 FROM datagen_source_table;
CREATE TABLE `hdfs_sink_table` (`id` INT,`name` STRING,`part1` INT,`part2` INT) PARTITIONED BY (part1, part2) WITH ('connector' = 'compressible-fs','hadoop.compression.codec' = 'LzopCodec','path' = 'hdfs://HDFS10000/data/','format' = 'json','sink.rolling-policy.file-size' = '1M','sink.rolling-policy.rollover-interval' = '10 min','sink.partition-commit.delay' = '1 s','sink.partition-commit.policy.kind' = 'success-file');
参数值 | 必填 | 默认值 | 描述 |
hadoop.compression.codec | 否 | 无 | 使用的压缩算法,可选值为 LzopCodec 和 OceanusSnappyCodec,不指定时,按照默认的文件格式写入。其中 OceanusSnappyCodec 是由于 snappy 库版本原因,对于 SnappyCodec 的封装,结果完全同 SnappyCodec |
filename.suffix | 否 | 无 | 最终写入文件名,如果没有声明,则会按照支持的压缩算法生成特定的后缀名,如果采用了非 lzop 和 snappy 压缩算法且未声明该值,则文件后缀为空 |
filepath.contain.partition-key | 否 | false | 写入分区文件时,最终的写入路径是否包括分区字段,默认不包括。例如,假设写入一个按天分区dt=12和按小时分区ht=24的分区路径,默认的分区路径为12/24 而非 dt=12/ht=24 |
本页内容是否解决了您的问题?