tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

使用客户端订阅消息

PDF
聚焦模式
字号
最后更新时间: 2026-01-30 15:10:02
MQTT (Message Queuing Telemetry Transport) 是一种轻量级的、基于发布/订阅(Pub / Sub)模式的物联网消息传输协议。在 MQTT 通信模型中,订阅 (Subscription) 是客户端(订阅者)向服务器(Broker)表达其希望接收特定主题或一类主题消息的核心机制。

发布订阅模型

发布者将消息发送到特定的 Topic,Broker 负责将消息转发给所有订阅了该 Topic 的订阅者。
例如下图场景中,温度传感器将温度读数发布到 “ temperature ” 主题,手机和后端业务系统订阅该主题后,都将获得温度传感器发布的全量数据。


订阅原理

1. 连接建立:客户端首先与 MQTT Broker 建立 TCP 连接,并发送 CONNECT 报文进行认证。
2. 发起订阅:连接成功后,客户端向 Broker 发送一个 SUBSCRIBE 报文。这个报文中包含了一个或多个它想要订阅的主题过滤器 (Topic Filter) 以及对应的服务质量等级 (QoS)。
3. 确认订阅:Broker 收到 SUBSCRIBE 报文后,会回复一个 SUBACK 订阅确认报文。该报文包含了为每个请求订阅的 Topic Filter 所授予的 QoS 等级(可能是客户端请求的,也可能是 Broker 根据自身策略授予的)。
4. 消息路由:此后,一旦有消息发布到匹配该 Topic Filter 的 Topic 上,Broker 就会立即将消息转发给该客户端。
5. 取消订阅:当客户端不再希望接收某些 Topic 的消息时,它会发送一个 UNSUBSCRIBE 报文,Broker 随后停止转发相关消息。

主题过滤器与通配符

客户端在订阅时使用的不是完全确定的 Topic 名,而是可以使用通配符的 Topic Filter,这使得它可以灵活地订阅多个 Topic。
通配符
说明
示例
+
单层通配符,代表匹配一个且仅一个任意主题层级
订阅 sensor/+/temperature
匹配 sensor/bedroom/temperature
匹配 sensor/living_room/temperature
不匹配 sensor/bedroom/upstairs/temperature (因为+只能匹配一层)
不匹配 sensor/temperature (因为 sensor 和 temperature 之间缺少一层)
#
多层通配符,代表匹配零个或多个任意主题层级,必须是主题过滤器的最后一个字符
订阅 sensor/bedroom/#
匹配 sensor/bedroom/temperature
匹配 sensor/bedroom/humidity
匹配 sensor/bedroom/light/intensity
匹配 sensor/bedroom (# 可以匹配零层)
不匹配 sensor/living_room/temperature (因为 bedroom 是固定的)

服务质量 (QoS)

QoS(Quality of Service)指代消息传输的服务质量,每条消息都可以在发送时单独设置 QoS。它包括以下级别:
QoS 等级
工作流程
优点
缺点
适用场景
QoS = 0
(最多分发一次)
发送者将消息发送出去,不要求接收方进行确认
传输速度最快,开销最小
消息可能会丢失,例如网络故障、接收方不在线情况
非关键性的数据,允许偶尔丢失,例如周期性的传感器数据(温度、湿度),丢失下一个数据点很快会补上。
QoS = 1
(至少达到一次)
1. 发送者发送消息并保留一个副本。
2. 接收者收到消息后,必须回复一个 PUBACK (发布确认) 报文。
3. 发送者收到 PUBACK 后,才会丢弃消息副本。
4. 如果发送者在合理时间内未收到 PUBACK,它会重新发送该消息。
保证了消息不会丢失
可能导致消息重复
需要保证消息必达,但可以接受偶尔重复的场景,例如控制指令(“开关灯”),重复执行一次可能也没问题。
QoS = 2
(仅到达一次)
1. PUBLISH:发送者发送消息,并保留副本。
2. PUBREC:接收者收到后回复一个“已收到”确认。如果发送者没收到 PUBREC,会重发 PUBLISH。
3. PUBREL:发送者收到 PUBREC 后,发送一个“发布释放”报文,并可以丢弃消息副本。它现在只需要等待最终确认。
4. PUBCOMP:接收者收到 PUBREL 后,回复一个“发布完成”确认。之后才将消息交付给应用。发送者收到 PUBCOMP,流程结束。如果任何一方没收到应答,都会重发上一条报文。
既保证了消息不丢失,又确保了不会重复
速度最慢,开销最大
对可靠性和准确性要求极高的关键业务,例如计费系统、金融交易、关键状态同步,任何重复或丢失都会造成严重后果。

保留消息 (Retained Messages)

发布一条消息时,可以设置retained参数等于true,Broker 会为这个 Topic 保存最新的这条保留消息。
当一个新的客户端订阅匹配的 Topic 时,Broker 会立即将这条最新的保留消息发送给它,而无需等待其下一次发布,这对于获取设备的最新状态非常有用。
例如,一个温度传感器每隔小时发布一次当前温度并设置为保留消息。任何新上线的客户端在订阅 sensor/temperature 后,能立刻收到当前的最新温度,而不必等待下一个小时。

使用示例

package com.tencent.tdmq.mqtt.example.paho.v5;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class SubscriberQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "SubscriberQuickStart";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "user0";
String password = "secret0";

// MQTT topic filters
String[] topicFilters = new String[]{"home/test", "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());
client.setTimeToWait(3000);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setAutomaticReconnect(true);

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}

System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",
topic, message.getQos(), content);
List<UserProperty> userProperties = message.getProperties().getUserProperties();
printUserProperties(userProperties);
}

@Override
public void deliveryComplete(IMqttToken token) {
System.out.println("Delivery complete for packet-id: " + token.getMessageId());
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
// Subscribe
IMqttToken token = client.subscribe(topicFilters, qos);
int[] reasonCodes = token.getReasonCodes();
for (int i = 0; i < reasonCodes.length; i++) {
System.out.printf("Subscribed to topic %s with QoS=%d, Granted-QoS: %d%n",
topicFilters[i], qos[i], reasonCodes[i]);
}

if (token.isComplete()) {
List<UserProperty> userProperties = token.getResponseProperties().getUserProperties();
printUserProperties(userProperties);
}
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
System.out.println("Received auth packet with id: " + i);
}
});

client.connect(options);

TimeUnit.MINUTES.sleep(5);

client.disconnect();

client.close();

}

static void printUserProperties(List<UserProperty> userProperties) {
if (null != userProperties) {
for (UserProperty userProperty : userProperties) {
System.out.printf("User property: %s = %s%n", userProperty.getKey(), userProperty.getValue());
}
}
}
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈