tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

集成数据到 CKafka

PDF
聚焦模式
字号
最后更新时间: 2026-01-30 15:10:03


实现原理

CKafka 连接器内置 MQTT Source Plugin,通过 MQTT 共享订阅机制,实时接入 MQTT 消息并转发至 CKafka 集群。该共享订阅模式支持高并发配置,可有效保障数据传输吞吐量,充分满足 Kafka 与大数据生态集成时对高流量接入和处理能力的需求。


数据映射

MQTT 消息在转换为 Kafka Record 时, 映射关系如下:


MQTT Message

一条 MQTT 消息由三部分组成: 系统字段、用户属性、Payload 参考: MQTT Control Packet format

系统字段

字段名称
语义
Packet ID
控制指令 ID, 不唯一, 快速复用详见 Spec 2.2.1
Duplicated
详见Spec 3.3.1.1
QoS
详见Spec 3.3.1.2
Retained
详见Spec 3.3.1.3
Message ID
扩展字段, 唯一消息编号
Message Timestamp
扩展字段, 服务端存储消息时间
Publisher Client ID
扩展字段, 发布消息的客户端标识符
Publisher Client Host
扩展字段, 发布消息的客户端 IP
Publisher Username
扩展字段, 发布消息的客户端用户名

User Properties

用户指定的键值对列表, 参见 Spec 3.3.2.3.7

Kafka Record

字段
语义
Key
记录的键值, 可选
Headers
记录关联键值对, 常用来存储元数据, 比如 Content Type、事件时间等,可选
Payload
记录的真正负载数据, 消息体

Headers 使用场景

Message 路由
元数据存储描述
链路追踪和日志
定制化业务处理
安全认证
消息优先级
互操作性/兼容性指令
流处理

业务应用场景

智慧城市与交通数字孪生​​
实时采集城市多源交通数据(如车辆车牌、速度、行驶轨迹),通过 MQTT 主题上报,并借助 Kafka 连接器接入大数据生态。
支持基于车牌号等属性进行高效检索与分析(如车辆轨迹还原),为交通监控、调度和仿真提供数据支撑。

特性与优势

消息队列 CKafka 版是一个分布式、高吞吐量、高可扩展性的消息系统,然其本身并非专为边缘物联网通信场景设计,其客户端通常需要稳定的网络环境和较高的硬件资源,而物联网领域中海量的设备和应用产生的数据往往通过轻量级的 MQTT 协议进行传输。通过 ​​CKafka MQTT 连接器​​实现 MQTT 协议与 CKafka 生态的无缝集成,将设备端发布的 MQTT 消息实时流入 CKafka 主题,确保数据能够被实时处理、存储和进一步分析。该集成不仅保留了 MQTT 在弱网与低资源环境下的通信优势,还充分发挥 CKafka 在高吞吐、高可靠及生态兼容性方面的能力,真正实现了物联网数据与大数据系统间的灵活、稳定和高效整合。

操作步骤

策略与权限

1. 登录 消息队列 MQTT 版控制台,进入集群详情页面,确认当前 MQTT 集群是否开启授权策略管理。
1.1 若未开启权限策略,数据面资源暂无权限管理,可以使用任意“用户名+密码”进行连接、生产和消费等操作,详情见 配置数据面授权。在此情况下,进行数据集成到CKafka的操作时无需其他额外配置,但是由于缺少权限管控,会存在一定的数据安全风险。
1.2 若已开启权限策略,请按照下文所述步骤进行操作。
2. 进入 认证管理 > 用户名和密码,单击新建用户 ,为数据集成任务创建专用账号和密码,用户名为ckafka_connector,并在说明中注明此用户为仅用于 MQTT 与 CKafka 数据集成任务,如图所示。

3. 进入 授权策略管理 页面,单击新建授权策略,强烈建议在本策略中明确授权上一步所创建的 CKafka 数据集成专用账号,以实现精确的权限控制。具体配置方式可参考下图,其余字段请结合实际需求填写,详情参考 配置数据面授权


配置 CKafka 连接器

1. 登录 消息队列 CKafka 版控制台,进入连接列表页面,首先在页面最上方确认连接的所属地域。
2. 单击新建连接,进行连接器的创建

3. 按下图步骤进行连接信息的选择,连接类型选择 MQTT 集群,单击下一步进入连接配置页面。

4. 输入连接名称、描述等基本信息,并在下拉框中选择目标 MQTT 集群。此处的用户名和密码用作连接认证,是在 MQTT 集群中创建的数据集成专用账号密码,详见 策略和权限 小节。单击下一步进入连接校验过程。

5. 当校验均通过后,连接即创建成功。您可在 CKafka 控制台 > 连接器 > 连接列表中查看新增的连接。
对于已创建的连接,连接列表中将展示其基本信息,包括 ID、名称、状态、连接类型、绑定资源、资源所属地域、关联任务数、创建时间、描述等。
单击操作列中的编辑按钮,可修改连接配置。更新连接后,系统会默认开启“更新并重启所有关联任务”的开关,请在操作时根据实际业务需求谨慎选择;
单击操作列中的删除按钮将删除此连接。


创建数据集成任务

前提条件

在 MQTT 集群的同一地域下,已有创建好的 CKafka 实例,具体操作详情参阅消息队列 CKafka 快速入门

任务创建

1. 进入 CKafka 控制台 > 连接器 > 任务列表,单击左上角新建任务,进行任务相关信息填写,任务类型选择 数据接入 > MQTT集群,单击下一步进入数据源配置。

2. 在下拉框中选择合适的连接,若没有合适选项,可单击下方跳转按钮,进入新建连接步骤;输入订阅的 Topic,若订阅多个主题请用“,”隔开。

3. 进行数据目标配置,确定分发策略以及目标CKafka实例。单击提交完成任务创建。

4. 当任务创建成功后,MQTT 集群下会自动新建一个共享订阅组,用于执行数据集成。

也可前往 客户端管理 页面查看执行该任务的连接器客户端详情。



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈