tencent cloud

对象存储

动态与公告
产品动态
产品公告
产品简介
产品概述
功能概览
应用场景
产品优势
基本概念
地域和访问域名
规格与限制
产品计费
计费概述
计费方式
计费项
免费额度
计费示例
查看和下载账单
欠费说明
常见问题
快速入门
控制台快速入门
COSBrowser 快速入门
用户指南
创建请求
存储桶
对象
数据管理
批量处理
全球加速
监控与告警
运维中心
数据处理
内容审核
智能工具箱
数据工作流
应用集成
工具指南
工具概览
环境安装与配置
COSBrowser 工具
COSCLI 工具
COSCMD 工具
COS Migration 工具
FTP Server 工具
Hadoop 工具
COSDistCp 工具
HDFS TO COS 工具
GooseFS-Lite 工具
在线辅助工具
自助诊断工具
实践教程
概览
访问控制与权限管理
性能优化
使用 AWS S3 SDK 访问 COS
数据容灾备份
域名管理实践
图片处理实践
COS 音视频播放器实践
工作流实践
数据直传
内容审核实践
数据安全
数据校验
大数据实践
COS 成本优化解决方案
在第三方应用中使用 COS
迁移指南
本地数据迁移至 COS
第三方云存储数据迁移至 COS
以 URL 作为源地址的数据迁移至 COS
COS 之间数据迁移
Hadoop 文件系统与 COS 之间的数据迁移
数据湖存储
云原生数据湖
元数据加速
数据加速器 GooseFS
数据处理
数据处理概述
图片处理
媒体处理
内容审核
文件处理
文档处理
故障处理
获取 RequestId 操作指引
通过外网上传文件至 COS 缓慢
访问 COS 时返回403错误码
资源访问异常
POST Object 常见异常
API 文档
简介
公共请求头部
公共响应头部
错误码
请求签名
操作列表
Service 接口
Bucket 接口
Object 接口
批量处理接口
数据处理接口
任务与工作流
内容审核接口
云查毒接口
SDK 文档
SDK 概览
准备工作
Android SDK
C SDK
C++ SDK
.NET(C#) SDK
Flutter SDK
Go SDK
iOS SDK
Java SDK
JavaScript SDK
Node.js SDK
PHP SDK
Python SDK
React Native SDK
小程序 SDK
错误码
鸿蒙(Harmony) SDK
终端 SDK 质量优化
安全与合规
数据容灾
数据安全
访问管理
常见问题
热门问题
一般性问题
计费计量问题
域名合规问题
存储桶配置问题
域名和 CDN 问题
文件操作问题
日志监控问题
权限管理问题
数据处理问题
数据安全问题
预签名 URL 问题
SDK 类问题
工具类问题
API 类问题
服务协议
Service Level Agreement
隐私政策
数据处理和安全协议
联系我们
词汇表
文档对象存储实践教程大数据实践使用流计算 Oceanus 接入 COS

使用流计算 Oceanus 接入 COS

PDF
聚焦模式
字号
最后更新时间: 2024-01-06 10:54:03

Oceanus 简介

流计算 Oceanus是大数据生态体系的实时化分析利器。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。
流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。
目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。本文将为您详细介绍如何使用 Oceanus 对接对象存储(Cloud Object Storage,COS)。

准备工作

创建 Oceanus 集群

登录 Oceanus 控制台,创建一个 Oceanus 集群。

创建 COS 存储桶

1. 登录 COS 控制台
2. 在左侧导航栏中,单击存储桶列表
3. 单击创建存储桶,创建一个存储桶。具体可参见 创建存储桶 文档。
说明
当写入 COS 时,Oceanus 作业所运行的地域必须和 COS 在同一个地域。

实践步骤

前往 Oceanus 控制台,创建一个 SQL 作业,集群选择与 COS 在相同地域的集群。

1. 创建 Source

CREATE TABLE `random_source` (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='10', -- 每秒产生的数据条数
'fields.f_sequence.kind'='random', -- 随机数
'fields.f_sequence.min'='1', -- 随机数的最小值
'fields.f_sequence.max'='10', -- 随机数的最大值
'fields.f_random.kind'='random', -- 随机数
'fields.f_random.min'='1', -- 随机数的最小值
'fields.f_random.max'='100', -- 随机数的最大值
'fields.f_random_str.length'='10' -- 随机字符串的长度
);
说明
此处选用内置 connector datagen,请根据实际业务需求选择相应数据源。

2. 创建 Sink

-- 请将<存储桶名称>和<文件夹名称>替换成您实际的存储桶名称和文件夹名称
CREATE TABLE `cos_sink` (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) PARTITIONED BY (f_sequence) WITH (
'connector' = 'filesystem',
'path'='cosn://<存储桶名称>/<文件夹名称>/', --- 数据写入的目录路径
'format' = 'json', --- 数据写入的格式
'sink.rolling-policy.file-size' = '128MB', --- 文件最大的大小
'sink.rolling-policy.rollover-interval' = '30 min', --- 文件最大写入时间
'sink.partition-commit.delay' = '1 s', --- 分区提交延迟
'sink.partition-commit.policy.kind' = 'success-file' --- 分区提交方式
);
说明
更多 Sink 的 WITH 参数,请参见Filesystem (HDFS/COS)文档。

3. 业务逻辑

INSERT INTO `cos_sink`
SELECT * FROM `random_source`;
注意
此处只做展示,无实际业务目的。

4. 作业参数设置

内置 Connector选择flink-connector-cos,在高级参数中对 COS 的地址进行如下配置:
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
fs.cosn.bucket.region: <COS 所在地域>
fs.cosn.userinfo.appid: <COS 所属用户的 appid>
作业配置说明如下:
请将<COS 所在地域>替换为您实际的 COS 地域,例如:ap-guangzhou。
请将<COS 所属用户的 appid>替换为您实际的 APPID,具体请进入 账号中心 查看。
说明
具体的作业参数设置请参见Filesystem (HDFS/COS) 文档。

5. 启动作业

依次单击保存 > 语法检查 > 发布草稿,等待 SQL 作业启动后,即可前往相应 COS 目录中查看写入数据。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈