Flink Version | Description |
1.11 | Supported |
1.13 | Supported (supports common compression algorithms such as LZO and Snappy) |
1.14 | Supports write to HDFS (does not support the LZO and Snappy compression algorithms) |
1.16 | Supports write to HDFS (does not support the LZO and Snappy compression algorithms) |
CREATE TABLE `hdfs_sink_table` (`id` INT,`name` STRING,`part1` INT,`part2` INT) PARTITIONED BY (part1, part2) WITH ('connector' = 'filesystem','path' = 'hdfs://HDFS10000/data/', -- cosn://${buketName}/path/to/store/data'format' = 'json','sink.rolling-policy.file-size' = '1M','sink.rolling-policy.rollover-interval' = '10 min','sink.partition-commit.delay' = '1 s','sink.partition-commit.policy.kind' = 'success-file');
Option | Required | Default Value | Description |
path | Yes | - | The path to which files are written. |
sink.rolling-policy.file-size | No | 128MB | The maximum file size. If a file exceeds this size, it will be closed. A new file will be opened and data will be written to this new file. |
sink.rolling-policy.rollover-interval | No | 30min | The maximum duration a file can stay open. After this duration elapses, the file will be closed. A new file will be opened and data will be written to the new file. |
sink.rolling-policy.check-interval | No | 1min | The file check interval. The FileSystem connector will check whether a file should be closed at this interval. |
sink.partition-commit.trigger | No | process-time | The partition commit trigger. Valid values: process-time : A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is the time when the partition is created.partition-time : A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is extracted from partition values. partition-time requires watermark generation to automatically detect partitions. A partition is committed once watermark passes the time extracted from partition values plus delay . |
sink.partition-commit.delay | No | 0s | The time to wait before committing a partition. A partition will not be committed until the specified time elapses after its creation time. |
partition.time-extractor.kind | No | default | The partition time extractor. This option is valid only if sink.partition-commit.trigger is partition-time . If you have your own time extractor, set this to custom . |
partition.time-extractor.class | No | - | The partition time extractor class. This class should implement the PartitionTimeExtractor API. |
partition.time-extractor.timestamp-pattern | No | - | The partition timestamp extraction format. The timestamps should be yyyy-mm-dd hh:mm:ss , with the placeholders replaced with the corresponding partition fields in the Hive table. By default, yyyy-mm-dd hh:mm:ss is extracted from the first field.If timestamps are extracted from a single partition field dt , you can set this to $dt .If timestamps are extracted from multiple partition fields, such as year , month , day , and hour , you can set this to $year-$month-$day $hour:00:00 .If timestamps are extracted from two partition fields dt and hour , you can set this to $dt $hour:00:00 . |
sink.partition-commit.policy.kind | Yes | - | The partition committing policy. Valid values: success-file : When a partition is committed, a _success file will be generated in the partition's directory.custom : A custom committing policy. |
sink.partition-commit.policy.class | No | - | The partition committing class. This class should implement PartitionCommitPolicy . |
hdfs-site.xml
file of the Hadoop cluster. The file includes the parameter values needed for the configuration.hdfs://${dfs.nameserivces}/${path}
. You can find the value of ${dfs.nameserivces}
in hdfs-site.xml
. ${path}
is the path to which data will be written.path
. You don't need to set advanced parameters.hdfs-site.xml
.fs.hdfs.dfs.nameservices: HDFS12345fs.hdfs.dfs.ha.namenodes.HDFS12345: nn2,nn1fs.hdfs.dfs.namenode.http-address.HDFS12345.nn1: 172.27.2.57:4008fs.hdfs.dfs.namenode.https-address.HDFS12345.nn1: 172.27.2.57:4009fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn1: 172.27.2.57:4007fs.hdfs.dfs.namenode.http-address.HDFS12345.nn2: 172.27.1.218:4008fs.hdfs.dfs.namenode.https-address.HDFS12345.nn2: 172.27.1.218:4009fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn2: 172.27.1.218:4007fs.hdfs.dfs.client.failover.proxy.provider.HDFS12345: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
hadoop
.containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: The region of the COS bucket.fs.cosn.userinfo.appid: The AppID of the account that owns the COS bucket.
flink-connector-cos
.flink-connector-cos
is built into the image.fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterfs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterfs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/fs.cosn.trsf.fs.ofs.user.appid: The AppID of the account that owns the COS bucket.fs.cosn.trsf.fs.ofs.bucket.region: The region of the COS bucket.fs.cosn.trsf.fs.ofs.upload.flush.flag: truecontainerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopfs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: The region of the COS bucket.fs.cosn.userinfo.appid: The AppID of the account that owns the COS bucket.
fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterfs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterfs.ofs.tmp.cache.dir: /tmp/chdfs/fs.ofs.upload.flush.flag: truefs.ofs.user.appid: The AppID of the account that owns the file system.fs.ofs.bucket.region: The region of the file system.containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
krb5.conf
, emr.keytab
, core-site.xml
, and hdfs-site.xml
in the following paths./etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
vim hdfs-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.confhdfs-site.xmlcore-site.xml
klist -kt /var/krb5kdc/emr.keytab# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.conf
hdfs-site.xml
.fs.hdfs.dfs.nameservices: HDFS17995fs.hdfs.dfs.ha.namenodes.HDFS17995: nn2,nn1fs.hdfs.dfs.namenode.http-address.HDFS17995.nn1: 172.28.28.214:4008fs.hdfs.dfs.namenode.https-address.HDFS17995.nn1: 172.28.28.214:4009fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn1: 172.28.28.214:4007fs.hdfs.dfs.namenode.http-address.HDFS17995.nn2: 172.28.28.224:4008fs.hdfs.dfs.namenode.https-address.HDFS17995.nn2: 172.28.28.224:4009fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn2: 172.28.28.224:4007fs.hdfs.dfs.client.failover.proxy.provider.HDFS17995: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProviderfs.hdfs.hadoop.security.authentication: kerberos
CREATE TABLE datagen_source_table ( id INT, name STRING, part1 INT, part2 INT Block ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- The number of records generated per second. 'fields.part1.min'='1', 'fields.part1.max'='2', 'fields.part2.min'='1', 'fields.part2.max'='2' );CREATE TABLE hdfs_sink_table ( id INT, name STRING, part1 INT, part2 INT Block ) PARTITIONED BY (part1, part2) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://HDFS10000/data/', 'format' = 'json', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'sink.partition-commit.delay' = '1 s', 'sink.partition-commit.policy.kind' = 'success-file' );INSERT INTO hdfs_sink_table SELECT id, name, part1, part2 FROM datagen_source_table;
CREATE TABLE `hdfs_sink_table` (`id` INT,`name` STRING,`part1` INT,`part2` INT) PARTITIONED BY (part1, part2) WITH ('connector' = 'compressible-fs','hadoop.compression.codec' = 'LzopCodec','path' = 'hdfs://HDFS10000/data/','format' = 'json','sink.rolling-policy.file-size' = '1M','sink.rolling-policy.rollover-interval' = '10 min','sink.partition-commit.delay' = '1 s','sink.partition-commit.policy.kind' = 'success-file');
filesystem
connector, compressible-fs
also supports the following three parameters:Option | Required | Default Value | Description |
hadoop.compression.codec | No | - | The compression algorithm to use. Valid values include LzopCodec and OceanusSnappyCodec . If this is not specified, data will be written in the default file format. OceanusSnappyCodec is used due to Snappy library version restrictions. It works the same as SnappyCodec . |
filename.suffix | No | - | The name of the file to which data is written. If this is not specified, a suffix will be generated according to the compression algorithm used. If neither lzop nor Snappy is used, and this option is not specified, the filename suffix will be empty. |
filepath.contain.partition-key | No | false | Whether to include the partition field in the path when data is written into partition files. The partition field is not included by default. For example, if partitioning is based on the date dt=12 and hour ht=24 , the default partition path will be 12/24 instead of dt=12/ht=24 . |
Was this page helpful?