tencent cloud

文档反馈

文件系统 FileSystem

最后更新时间:2023-11-08 16:00:57

    介绍

    FileSystem connector 提供了对 HDFSCOS 等常见文件系统的写入支持。

    版本说明

    Flink 版本
    说明
    1.11
    支持
    1.13
    支持,支持常见的 lzo、snappy 压缩算法
    1.14
    支持写入到 HDFS,不支持 lzo、snappy 压缩算法
    1.16
    支持写入到 HDFS,不支持 lzo、snappy 压缩算法

    使用范围

    FileSystem 支持作为 Append-Only 数据流的目的表 (Sink),目前还不支持 Upsert 数据流的目的表。FileSystem 目前支持以下格式的数据写入:
    CSV
    JSON
    Avro
    Parquet
    Orc
    说明
    目前使用数据格式 Avro、Parquet、Orc 写入时,需要 手动上传额外的 jar 包 才能使用。

    DDL 定义

    用作数据目的

    CREATE TABLE `hdfs_sink_table` (
    `id` INT,
    `name` STRING,
    `part1` INT,
    `part2` INT
    ) PARTITIONED BY (part1, part2) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://HDFS10000/data/', -- cosn://${buketName}/path/to/store/data
    'format' = 'json',
    'sink.rolling-policy.file-size' = '1M',
    'sink.rolling-policy.rollover-interval' = '10 min',
    'sink.partition-commit.delay' = '1 s',
    'sink.partition-commit.policy.kind' = 'success-file'
    );

    WITH 参数

    参数值
    必填
    默认值
    描述
    path
    文件写入的路径。
    sink.rolling-policy.file-size
    128MB
    文件最大大小。当当前写入的文件大小达到设置的阈值时,当前写入的文件将被关闭,并打开一个新的文件进行写入。
    sink.rolling-policy.rollover-interval
    30min
    文件最大持续写入时间。当当前写入的文件写入的时间超过了设置的阈值时,当前写入的文件将被关闭,并打开一个新的文件进行写入。
    sink.rolling-policy.check-interval
    1min
    文件检查间隔。FileSystem 按照这个间隔检查文件的写入时间是否已经满足了关闭条件,并将满足条件的文件进行关闭。
    sink.partition-commit.trigger
    process-time
    分区关闭策略。可选值包括:
    process-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间为分区创建时的物理时间。
    partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。partition-time 依赖于 watermark 生成,需要配合 watermark 才能支持自动分区发现。当 watermark 时间超过了 从分区抽取的时间delay 参数配置时间 之和后会提交分区。
    sink.partition-commit.delay
    0s
    分区关闭延迟。当分区在创建超过一定时间之后将被关闭。
    partition.time-extractor.kind
    default
    分区时间抽取方式。这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom。
    partition.time-extractor.class
    分区时间抽取类,这个类必须实现 PartitionTimeExtractor 接口。
    partition.time-extractor.timestamp-pattern
    分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。
    如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。
    如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。
    如果时间戳应该从两个分区字段 'dt' 和 'hour' 提取,可以配置 '$dt $hour:00:00'。
    sink.partition-commit.policy.kind
    用于提交分区的策略。可选值包括:
    success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。
    custom:用户实现的自定义分区提交策略。
    sink.partition-commit.policy.class
    分区提交类,这个类必须实现 PartitionCommitPolicy。

    HDFS 配置

    在 HDFS 上创建数据目录后,需为目录开启写权限,才可成功写入数据。流计算 Oceanus 写入 HDFS 的 user 是 flink。进行配置前,需要先登录 EMR 集群下载 Hadoop 集群的 hdfs-site.xml 文件,以获取下列配置中所需的参数值。
    HDFS 路径的形式为 hdfs://${dfs.nameserivces}/${path}${dfs.nameserivces} 的值可在 hdfs-site.xml 中查找,${path} 为要写入的数据目录。
    若目标 Hadoop 集群只有单个 Master,仅需要为 path 参数传入 HDFS 路径即可,无需使用高级参数。
    若目标 Hadoop 集群为高可用的双 Master 集群,为 path 参数传入 HDFS 路径后,还需要在作业参数的 高级参数 中对两个 Master 的地址和端口进行配置。以下是一个配置示例,相应的参数值都可在 hdfs-site.xml 中查找并替换。
    fs.hdfs.dfs.nameservices: HDFS12345
    fs.hdfs.dfs.ha.namenodes.HDFS12345: nn2,nn1
    fs.hdfs.dfs.namenode.http-address.HDFS12345.nn1: 172.27.2.57:4008
    fs.hdfs.dfs.namenode.https-address.HDFS12345.nn1: 172.27.2.57:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn1: 172.27.2.57:4007
    fs.hdfs.dfs.namenode.http-address.HDFS12345.nn2: 172.27.1.218:4008
    fs.hdfs.dfs.namenode.https-address.HDFS12345.nn2: 172.27.1.218:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn2: 172.27.1.218:4007
    fs.hdfs.dfs.client.failover.proxy.provider.HDFS12345: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    说明
    Flink 作业默认以 flink 用户操作 HDFS,若没有 HDFS 路径的写入权限,可通过作业 高级参数 设置为有权限的用户,或者设置为超级用户 hadoop。
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop

    COS 配置

    注意:当写入 COS 时,Oceanus 作业所运行的地域必须和 COS 在同一个地域中。
    在作业参数的 高级参数 中对 COS 的地址进行配置。流计算 Oceanus 写入 COS 的 user 是 flink。需要的配置项如下,其中地域的取值可参考 对象存储-地域和访问域名
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
    fs.cosn.bucket.region: COS 所在的地域
    fs.cosn.userinfo.appid: COS 所属用户的 appid
    对于 Jar 作业,在使用 COS 作为数据写入的文件系统时,用户需要在内置 Connector 中勾选 flink-connector-cos。
    注意:
    Flink-1.16已在镜像中内置 flink-connector-cos,无需上述两步。

    元数据加速 COS 桶、CHDFS

    1. 权限工具。 在计算资源 > 集群信息 > 更多选项中选择权限工具。
    2. 配置 COS 元数据加速桶或 CHDFS 权限。
    3. 依赖下载。
    在 Oceanus 的程序包管理上传对应 Jar 包,详情可参见 程序包管理
    元数据加速 COS 桶
    下载对应 Flink 版本的 Connector。
    CHDFS
    4. 作业 高级参数 配置。
    元数据加速 COS 桶
    fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
    fs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
    fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/
    fs.cosn.trsf.fs.ofs.user.appid: COS 所属用户的 appid
    fs.cosn.trsf.fs.ofs.bucket.region: COS 所在的地域
    fs.cosn.trsf.fs.ofs.upload.flush.flag: true
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
    fs.cosn.bucket.region: COS 所在的地域
    fs.cosn.userinfo.appid: COS 所属用户的 appid
    注意:
    Flink-1.16 默认支持元数据加速桶,无需下载Jar包,高级参数只需填写如下两项,其他参数已自动填充。
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    CHDFS
    fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
    fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
    fs.ofs.tmp.cache.dir: /tmp/chdfs/
    fs.ofs.upload.flush.flag: true
    fs.ofs.user.appid: CHDFS 所属用户的 appid
    fs.ofs.bucket.region: CHDFS 所在的地域
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop

    手动上传对应 Jar 包

    1. 先下载对应 Jar 包到本地。 不同 Flink 版本下载地址:Flink 1.11Flink 1.13Flink 1.14
    2. 在 Oceanus 的程序包管理上传对应 Jar 包,详情可参见 程序包管理
    3. 进入对应作业的开发调试界面,打开作业参数侧栏。 在作业参数的引用程序包栏单击添加程序包,选择在第2步上传的 Jar 包,单击确定保存作业参数配置。
    4. 发布作业。

    HDFS Kerberos 认证授权

    1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab、core-site.xml、hdfs-site.xml 文件,路径如下。
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    /usr/local/service/hadoop/etc/hadoop/core-site.xml
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    2. 对步骤1中获取的文件打 jar 包。
    jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
    3. 校验 jar 的结构(可以通过 vim 命令查看 vim hdfs-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    hdfs-site.xml
    core-site.xml
    4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
    5. 获取 kerberos principal,用于作业 高级参数 配置。
    klist -kt /var/krb5kdc/emr.keytab
    
    # 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. 作业 高级参数 配置。
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    如果是 Flink-1.13 版本,需要在高级参数额外增加如下参数,其中参数的值需要为对应 hdfs-site.xml 中的值。
    fs.hdfs.dfs.nameservices: HDFS17995
    fs.hdfs.dfs.ha.namenodes.HDFS17995: nn2,nn1
    fs.hdfs.dfs.namenode.http-address.HDFS17995.nn1: 172.28.28.214:4008
    fs.hdfs.dfs.namenode.https-address.HDFS17995.nn1: 172.28.28.214:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn1: 172.28.28.214:4007
    fs.hdfs.dfs.namenode.http-address.HDFS17995.nn2: 172.28.28.224:4008
    fs.hdfs.dfs.namenode.https-address.HDFS17995.nn2: 172.28.28.224:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn2: 172.28.28.224:4007
    fs.hdfs.dfs.client.failover.proxy.provider.HDFS17995: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    fs.hdfs.hadoop.security.authentication: kerberos
    注意:
    历史 Oceanus 集群可能不支持该功能,您可通过 在线客服 联系我们升级集群管控服务,以支持 Kerberos 访问。

    代码示例

    
    CREATE TABLE datagen_source_table ( id INT, name STRING, part1 INT, part2 INT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- 每秒产生的数据条数 'fields.part1.min'='1', 'fields.part1.max'='2', 'fields.part2.min'='1', 'fields.part2.max'='2' );
    
    CREATE TABLE hdfs_sink_table ( id INT, name STRING, part1 INT, part2 INT ) PARTITIONED BY (part1, part2) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://HDFS10000/data/', 'format' = 'json', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'sink.partition-commit.delay' = '1 s', 'sink.partition-commit.policy.kind' = 'success-file' );
    
    INSERT INTO hdfs_sink_table SELECT id, name, part1, part2 FROM datagen_source_table;

    compressible-fs connector使用说明

    只支持在 flink 1.13版本使用。
    支持对于 csv 和 json 两种 format 的写入,其它诸如 avro、parquet、orc 文件格式已经自带压缩功能。
    支持 LzopCodec、OceanusSnappyCodec 两种压缩算法。
    支持写入 hdfs 和 cos 文件,使用方法和 filesystem 一致。

    用作数据目的

    CREATE TABLE `hdfs_sink_table` (
    `id` INT,
    `name` STRING,
    `part1` INT,
    `part2` INT
    ) PARTITIONED BY (part1, part2) WITH (
    'connector' = 'compressible-fs',
    'hadoop.compression.codec' = 'LzopCodec',
    'path' = 'hdfs://HDFS10000/data/',
    'format' = 'json',
    'sink.rolling-policy.file-size' = '1M',
    'sink.rolling-policy.rollover-interval' = '10 min',
    'sink.partition-commit.delay' = '1 s',
    'sink.partition-commit.policy.kind' = 'success-file'
    );

    WITH 参数

    除上文中 filesystem connector 支持的参数外,compressible-fs 额外特有的参数有以下三个:
    参数值
    必填
    默认值
    描述
    hadoop.compression.codec
    使用的压缩算法,可选值为 LzopCodec 和 OceanusSnappyCodec,不指定时,按照默认的文件格式写入。其中 OceanusSnappyCodec 是由于 snappy 库版本原因,对于 SnappyCodec 的封装,结果完全同 SnappyCodec
    filename.suffix
    最终写入文件名,如果没有声明,则会按照支持的压缩算法生成特定的后缀名,如果采用了非 lzop 和 snappy 压缩算法且未声明该值,则文件后缀为空
    filepath.contain.partition-key
    false
    写入分区文件时,最终的写入路径是否包括分区字段,默认不包括。例如,假设写入一个按天分区dt=12和按小时分区ht=24的分区路径,默认的分区路径为12/24 而非 dt=12/ht=24
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持