tencent cloud

All product documents
Stream Compute Service
FileSystem
Last updated: 2023-11-08 16:00:41
FileSystem
Last updated: 2023-11-08 16:00:41

Overview

The FileSystem connector allows for writing to common file systems such as HDFS and Tencent Cloud Object Storage.

Versions

Flink Version
Description
1.11
Supported
1.13
Supported (supports common compression algorithms such as LZO and Snappy)
1.14
Supports write to HDFS (does not support the LZO and Snappy compression algorithms)
1.16
Supports write to HDFS (does not support the LZO and Snappy compression algorithms)

Limits

The FileSystem connector can be used as a data sink for append-only data streams. It cannot be used as a sink for upsert data currently. The following data formats are supported:
CSV
JSON
Avro
Parquet
ORC
Note
To write data in Avro, Parquet, or ORC format, you need to manually upload a JAR package.

Defining a table in DDL

As a sink

CREATE TABLE `hdfs_sink_table` (
`id` INT,
`name` STRING,
`part1` INT,
`part2` INT
) PARTITIONED BY (part1, part2) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://HDFS10000/data/', -- cosn://${buketName}/path/to/store/data
'format' = 'json',
'sink.rolling-policy.file-size' = '1M',
'sink.rolling-policy.rollover-interval' = '10 min',
'sink.partition-commit.delay' = '1 s',
'sink.partition-commit.policy.kind' = 'success-file'
);

WITH parameters

Option
Required
Default Value
Description
path
Yes
-
The path to which files are written.
sink.rolling-policy.file-size
No
128MB
The maximum file size. If a file exceeds this size, it will be closed. A new file will be opened and data will be written to this new file.
sink.rolling-policy.rollover-interval
No
30min
The maximum duration a file can stay open. After this duration elapses, the file will be closed. A new file will be opened and data will be written to the new file.
sink.rolling-policy.check-interval
No
1min
The file check interval. The FileSystem connector will check whether a file should be closed at this interval.
sink.partition-commit.trigger
No
process-time
The partition commit trigger. Valid values:
process-time: A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is the time when the partition is created.
partition-time: A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is extracted from partition values. partition-time requires watermark generation to automatically detect partitions. A partition is committed once watermark passes the time extracted from partition values plus delay.
sink.partition-commit.delay
No
0s
The time to wait before committing a partition. A partition will not be committed until the specified time elapses after its creation time.
partition.time-extractor.kind
No
default
The partition time extractor. This option is valid only if sink.partition-commit.trigger is partition-time. If you have your own time extractor, set this to custom.
partition.time-extractor.class
No
-
The partition time extractor class. This class should implement the PartitionTimeExtractor API.
partition.time-extractor.timestamp-pattern
No
-
The partition timestamp extraction format. The timestamps should be yyyy-mm-dd hh:mm:ss, with the placeholders replaced with the corresponding partition fields in the Hive table. By default, yyyy-mm-dd hh:mm:ss is extracted from the first field.
If timestamps are extracted from a single partition field dt, you can set this to $dt.
If timestamps are extracted from multiple partition fields, such as year, month, day, and hour, you can set this to $year-$month-$day $hour:00:00.
If timestamps are extracted from two partition fields dt and hour, you can set this to $dt $hour:00:00.
sink.partition-commit.policy.kind
Yes
-
The partition committing policy. Valid values:
success-file: When a partition is committed, a _success file will be generated in the partition's directory.
custom: A custom committing policy.
sink.partition-commit.policy.class
No
-
The partition committing class. This class should implement PartitionCommitPolicy.

HDFS configuration

After creating a data directory in HDFS, you need to grant write access to the directory. With Stream Compute Service, the user writing data to HDFS is Flink. Before the configuration, you need to log in to your EMR cluster to download the hdfs-site.xml file of the Hadoop cluster. The file includes the parameter values needed for the configuration.
The HDFS path is in the format hdfs://${dfs.nameserivces}/${path}. You can find the value of ${dfs.nameserivces} in hdfs-site.xml. ${path} is the path to which data will be written.
If the target Hadoop cluster has only one master node, you only need to pass the HDFS path to path. You don't need to set advanced parameters.
If the target Hadoop cluster is a high-availability cluster with two master nodes, after passing the HDFS path, you also need to specify the addresses and ports of the two master nodes by configuring advanced job parameters. Below is an example. You can find the parameter values in hdfs-site.xml.
fs.hdfs.dfs.nameservices: HDFS12345
fs.hdfs.dfs.ha.namenodes.HDFS12345: nn2,nn1
fs.hdfs.dfs.namenode.http-address.HDFS12345.nn1: 172.27.2.57:4008
fs.hdfs.dfs.namenode.https-address.HDFS12345.nn1: 172.27.2.57:4009
fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn1: 172.27.2.57:4007
fs.hdfs.dfs.namenode.http-address.HDFS12345.nn2: 172.27.1.218:4008
fs.hdfs.dfs.namenode.https-address.HDFS12345.nn2: 172.27.1.218:4009
fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn2: 172.27.1.218:4007
fs.hdfs.dfs.client.failover.proxy.provider.HDFS12345: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
Note
By default, Flink jobs access HDFS via Flink. If the Flink user does not have permission to write to HDFS, you can use advanced job parameters to set the accessing user to a user that has write permission or to the super-user hadoop.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop

COS configuration

Note: If you write data to COS, the job must run in the same region as the COS bucket.
Use advanced job parameters to specify the COS region. With Stream Compute Service, the user writing data to COS is Flink. You need to configure the following parameters. For the values of regions, see COS - Regions and Access Endpoints.
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
fs.cosn.bucket.region: The region of the COS bucket.
fs.cosn.userinfo.appid: The AppID of the account that owns the COS bucket.
For JAR jobs, to write data to COS, you need to select the built-in connector flink-connector-cos.
Note
If you use Flink 1.16, you can skip this step because flink-connector-cos is built into the image.

Metadata acceleration-enabled COS buckets and Tencent Cloud HDFS

1. Access Authorizations. Go to Compute resources > Cluster info > More and select Authorizations.
2. Grant permission to a metadata acceleration-enabled COS bucket or CHDFS.
3. Download dependencies.
On the Dependencies page of the Stream Compute Service console, upload the JAR package you downloaded. For details, see Managing Dependencies.
Metadata acceleration-enabled COS buckets
Corresponding Flink connectors
CHDFS
Metadata acceleration-enabled COS buckets
fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
fs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/
fs.cosn.trsf.fs.ofs.user.appid: The AppID of the account that owns the COS bucket.
fs.cosn.trsf.fs.ofs.bucket.region: The region of the COS bucket.
fs.cosn.trsf.fs.ofs.upload.flush.flag: true
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop

fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
fs.cosn.bucket.region: The region of the COS bucket.
fs.cosn.userinfo.appid: The AppID of the account that owns the COS bucket.
Note
Flink 1.16 supports metadata acceleration-enabled buckets by default. You don't need to download JAR packages. Just configure the following two options. The other parameters will be configured automatically.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
CHDFS
fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
fs.ofs.tmp.cache.dir: /tmp/chdfs/
fs.ofs.upload.flush.flag: true
fs.ofs.user.appid: The AppID of the account that owns the file system.
fs.ofs.bucket.region: The region of the file system.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop

Manually uploading a JAR package

1. Download the required JAR package. Download addresses for different Flink versions: Flink 1.11, Flink 1.13, Flink 1.14
2. On the Dependencies page of the Stream Compute Service console, upload the JAR package you downloaded. For details, see Managing Dependencies.
3. Go to the debug page of the job and click Job parameters. Find Referenced package and click Add package. Select the JAR package you upload in step 2, and click Confirm.
4. Publish the job.

HDFS Kerberos authentication

1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, and hdfs-site.xml in the following paths.
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
2. Package the files into a JAR file.
jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
3. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
hdfs-site.xml
core-site.xml
4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
5. Get Kerberos principals to configure advanced job parameters.
klist -kt /var/krb5kdc/emr.keytab

# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
If you use Flink 1.13, you need to add the following configurations to advanced parameters. The parameter values should be the same as those in hdfs-site.xml.
fs.hdfs.dfs.nameservices: HDFS17995
fs.hdfs.dfs.ha.namenodes.HDFS17995: nn2,nn1
fs.hdfs.dfs.namenode.http-address.HDFS17995.nn1: 172.28.28.214:4008
fs.hdfs.dfs.namenode.https-address.HDFS17995.nn1: 172.28.28.214:4009
fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn1: 172.28.28.214:4007
fs.hdfs.dfs.namenode.http-address.HDFS17995.nn2: 172.28.28.224:4008
fs.hdfs.dfs.namenode.https-address.HDFS17995.nn2: 172.28.28.224:4009
fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn2: 172.28.28.224:4007
fs.hdfs.dfs.client.failover.proxy.provider.HDFS17995: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
fs.hdfs.hadoop.security.authentication: kerberos
Note
Kerberos authentication is not supported for historical Stream Compute Service clusters. To support the feature, please contact us to upgrade the cluster management service.

Example


CREATE TABLE datagen_source_table ( id INT, name STRING, part1 INT, part2 INT Block ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- The number of records generated per second. 'fields.part1.min'='1', 'fields.part1.max'='2', 'fields.part2.min'='1', 'fields.part2.max'='2' );

CREATE TABLE hdfs_sink_table ( id INT, name STRING, part1 INT, part2 INT Block ) PARTITIONED BY (part1, part2) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://HDFS10000/data/', 'format' = 'json', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'sink.partition-commit.delay' = '1 s', 'sink.partition-commit.policy.kind' = 'success-file' );

INSERT INTO hdfs_sink_table SELECT id, name, part1, part2 FROM datagen_source_table;

About compressible-fs

Can only be used with Flink 1.13.
Supports writing data in the CSV or JSON format. Other formats such as Avro, Parquet, and ORC are built-in with compression capabilities.
Supports the compression algorithms LzopCodec and OceanusSnappyCodec.
Supports writing data to HDFS and COS files. The method is the same as that for FileSystem.

As a sink

CREATE TABLE `hdfs_sink_table` (
`id` INT,
`name` STRING,
`part1` INT,
`part2` INT
) PARTITIONED BY (part1, part2) WITH (
'connector' = 'compressible-fs',
'hadoop.compression.codec' = 'LzopCodec',
'path' = 'hdfs://HDFS10000/data/',
'format' = 'json',
'sink.rolling-policy.file-size' = '1M',
'sink.rolling-policy.rollover-interval' = '10 min',
'sink.partition-commit.delay' = '1 s',
'sink.partition-commit.policy.kind' = 'success-file'
);

WITH parameters

In addition to the above parameters supported by the filesystem connector, compressible-fs also supports the following three parameters:
Option
Required
Default Value
Description
hadoop.compression.codec
No
-
The compression algorithm to use. Valid values include LzopCodec and OceanusSnappyCodec. If this is not specified, data will be written in the default file format. OceanusSnappyCodec is used due to Snappy library version restrictions. It works the same as SnappyCodec.
filename.suffix
No
-
The name of the file to which data is written. If this is not specified, a suffix will be generated according to the compression algorithm used. If neither lzop nor Snappy is used, and this option is not specified, the filename suffix will be empty.
filepath.contain.partition-key
No
false
Whether to include the partition field in the path when data is written into partition files. The partition field is not included by default. For example, if partitioning is based on the date dt=12 and hour ht=24, the default partition path will be 12/24 instead of dt=12/ht=24.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon