实现原理
CKafka 连接器内置 MQTT Source Plugin,通过 MQTT 共享订阅机制,实时接入 MQTT 消息并转发至 CKafka 集群。该共享订阅模式支持高并发配置,可有效保障数据传输吞吐量,充分满足 Kafka 与大数据生态集成时对高流量接入和处理能力的需求。
数据映射
MQTT 消息在转换为 Kafka Record 时, 映射关系如下:
MQTT Message
系统字段
|
Packet ID | |
Duplicated | |
QoS | |
Retained | |
Message ID | 扩展字段, 唯一消息编号 |
Message Timestamp | 扩展字段, 服务端存储消息时间 |
Publisher Client ID | 扩展字段, 发布消息的客户端标识符 |
Publisher Client Host | 扩展字段, 发布消息的客户端 IP |
Publisher Username | 扩展字段, 发布消息的客户端用户名 |
User Properties
Kafka Record
|
Key | 记录的键值, 可选 |
Headers | 记录关联键值对, 常用来存储元数据, 比如 Content Type、事件时间等,可选 |
Payload | 记录的真正负载数据, 消息体 |
Headers 使用场景
Message 路由
元数据存储描述
链路追踪和日志
定制化业务处理
安全认证
消息优先级
互操作性/兼容性指令
流处理
业务应用场景
智慧城市与交通数字孪生
实时采集城市多源交通数据(如车辆车牌、速度、行驶轨迹),通过 MQTT 主题上报,并借助 Kafka 连接器接入大数据生态。
支持基于车牌号等属性进行高效检索与分析(如车辆轨迹还原),为交通监控、调度和仿真提供数据支撑。
特性与优势
消息队列 CKafka 版是一个分布式、高吞吐量、高可扩展性的消息系统,然其本身并非专为边缘物联网通信场景设计,其客户端通常需要稳定的网络环境和较高的硬件资源,而物联网领域中海量的设备和应用产生的数据往往通过轻量级的 MQTT 协议进行传输。通过 CKafka MQTT 连接器实现 MQTT 协议与 CKafka 生态的无缝集成,将设备端发布的 MQTT 消息实时流入 CKafka 主题,确保数据能够被实时处理、存储和进一步分析。该集成不仅保留了 MQTT 在弱网与低资源环境下的通信优势,还充分发挥 CKafka 在高吞吐、高可靠及生态兼容性方面的能力,真正实现了物联网数据与大数据系统间的灵活、稳定和高效整合。
操作步骤
策略与权限
1.1 若未开启权限策略,数据面资源暂无权限管理,可以使用任意“用户名+密码”进行连接、生产和消费等操作,详情见 配置数据面授权。在此情况下,进行数据集成到CKafka的操作时无需其他额外配置,但是由于缺少权限管控,会存在一定的数据安全风险。 1.2 若已开启权限策略,请按照下文所述步骤进行操作。
2. 进入 认证管理 > 用户名和密码,单击新建用户 ,为数据集成任务创建专用账号和密码,用户名为ckafka_connector,并在说明中注明此用户为仅用于 MQTT 与 CKafka 数据集成任务,如图所示。
3. 进入 授权策略管理 页面,单击新建授权策略,强烈建议在本策略中明确授权上一步所创建的 CKafka 数据集成专用账号,以实现精确的权限控制。具体配置方式可参考下图,其余字段请结合实际需求填写,详情参考 配置数据面授权。 配置 CKafka 连接器
2. 单击新建连接,进行连接器的创建
3. 按下图步骤进行连接信息的选择,连接类型选择 MQTT 集群,单击下一步进入连接配置页面。
4. 输入连接名称、描述等基本信息,并在下拉框中选择目标 MQTT 集群。此处的用户名和密码用作连接认证,是在 MQTT 集群中创建的数据集成专用账号密码,详见 策略和权限 小节。单击下一步进入连接校验过程。 5. 当校验均通过后,连接即创建成功。您可在 CKafka 控制台 > 连接器 > 连接列表中查看新增的连接。
对于已创建的连接,连接列表中将展示其基本信息,包括 ID、名称、状态、连接类型、绑定资源、资源所属地域、关联任务数、创建时间、描述等。
单击操作列中的编辑按钮,可修改连接配置。更新连接后,系统会默认开启“更新并重启所有关联任务”的开关,请在操作时根据实际业务需求谨慎选择;
单击操作列中的删除按钮将删除此连接。
创建数据集成任务
前提条件
任务创建
1. 进入 CKafka 控制台 > 连接器 > 任务列表,单击左上角新建任务,进行任务相关信息填写,任务类型选择 数据接入 > MQTT集群,单击下一步进入数据源配置。 2. 在下拉框中选择合适的连接,若没有合适选项,可单击下方跳转按钮,进入新建连接步骤;输入订阅的 Topic,若订阅多个主题请用“,”隔开。
3. 进行数据目标配置,确定分发策略以及目标CKafka实例。单击提交完成任务创建。
4. 当任务创建成功后,MQTT 集群下会自动新建一个共享订阅组,用于执行数据集成。
也可前往 客户端管理 页面查看执行该任务的连接器客户端详情。