Oceanus 简介
流计算 Oceanus是大数据生态体系的实时化分析利器。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。
流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。
目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。本文将为您详细介绍如何使用 Oceanus 对接对象存储(Cloud Object Storage,COS)。
准备工作
创建 Oceanus 集群
创建 COS 存储桶
2. 在左侧导航栏中,单击存储桶列表。
3. 单击创建存储桶,创建一个存储桶。具体可参见 创建存储桶 文档。 说明
当写入 COS 时,Oceanus 作业所运行的地域必须和 COS 在同一个地域。
实践步骤
1. 创建 Source
CREATE TABLE `random_source` (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.f_sequence.kind'='random',
'fields.f_sequence.min'='1',
'fields.f_sequence.max'='10',
'fields.f_random.kind'='random',
'fields.f_random.min'='1',
'fields.f_random.max'='100',
'fields.f_random_str.length'='10'
);
说明
此处选用内置 connector datagen
,请根据实际业务需求选择相应数据源。
2. 创建 Sink
CREATE TABLE `cos_sink` (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) PARTITIONED BY (f_sequence) WITH (
'connector' = 'filesystem',
'path'='cosn://<存储桶名称>/<文件夹名称>/',
'format' = 'json',
'sink.rolling-policy.file-size' = '128MB',
'sink.rolling-policy.rollover-interval' = '30 min',
'sink.partition-commit.delay' = '1 s',
'sink.partition-commit.policy.kind' = 'success-file'
);
说明
更多 Sink 的 WITH 参数,请参见Filesystem (HDFS/COS)文档。
3. 业务逻辑
INSERT INTO `cos_sink`
SELECT * FROM `random_source`;
4. 作业参数设置
在内置 Connector选择flink-connector-cos
,在高级参数中对 COS 的地址进行如下配置:
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
fs.cosn.bucket.region: <COS 所在地域>
fs.cosn.userinfo.appid: <COS 所属用户的 appid>
作业配置说明如下:
请将<COS 所在地域>
替换为您实际的 COS 地域,例如:ap-guangzhou。
请将<COS 所属用户的 appid>
替换为您实际的 APPID,具体请进入 账号中心 查看。 说明
具体的作业参数设置请参见Filesystem (HDFS/COS) 文档。
5. 启动作业
依次单击保存 > 语法检查 > 发布草稿,等待 SQL 作业启动后,即可前往相应 COS 目录中查看写入数据。
本页内容是否解决了您的问题?