tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

配置共享订阅

PDF
聚焦模式
字号
最后更新时间: 2026-01-30 15:10:02
共享订阅是一种负载均衡的消费方式,属于同一个共享订阅组的多个客户端以轮询方式分配并接收订阅到的消息。因此,通常也称为消费者负载均衡。
例如下图中,发布者发布了4条消息:M1、M2、M3、M4。客户端 Client-0 和 Client-1 以共享订阅的方式消费订阅的主题消息。这与默认的 MQTT Pub/Sub 不同,Client-0 仅消费到了 M1, M3; Client-1 仅消费到了 M2, M4

在标准的 MQTT 协议中,共享订阅(Shared Subscription)是 MQTT 5.0 新增的功能,但是腾讯云消息队列 MQTT 版在服务端做了功能增强,除了 5.0 版本的客户端外,同时支持 3.1 和 3.1.1 版本的客户端,3.1 和 3.1.1 版本的客户端直接按照共享订阅的使用规范进行订阅即可。
在使用共享订阅前,需要在页面先创建共享订阅组(ShareName),否则客户端在订阅时会订阅失败。

约束与限制

单集群下最多支持创建20个共享订阅组;单个共享订阅组中订阅表达式数量上限为10个;单个共享订阅 Group 订阅和取消订阅请求上限总和 TPS 为10;单个共享订阅组下的客户端数量为1024个;达到限额后,客户端订阅消息请求 (SUBSCRIBE) 报错。若铂金版集群对于限额有特殊需求,可以提交工单联系我们。

前提条件

使用共享订阅方式订阅消息

步骤1:创建共享订阅组

1. 登录 MQTT 控制台
2. 在左侧导航栏单击 资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅组页签,单击 新建 创建共享订阅组,根据要求在弹窗中填写如下字段:
​​参数项​​
​​说明​​
​​示例值​​
共享订阅组名称
设置共享订阅组名称,需符合命名规则:不能为空,1-64个字符,支持字母、数字。
order_processing_group
负载均衡策略
○ 随机(默认策略,即 random):在不同的订阅客户端间随机分发负载。
○ 分区哈希:在一定程度上保证消息负载时的消息顺序性。选择该模式后,需要另外填写负载均衡生效的时间。即新的消费者客户端加入后,要在经过指定时延后,才会加入到负载均衡策略中。(当前默认实现 Topic-hash,即共享订阅中优先保证同一Topic下的消息顺序,若您需要 ClientID-hash请提交工单联系我们)
随机
描述
填写共享订阅组的说明信息,选填
订单处理多客户端负载
4. 单击 提交 后创建完成。


步骤2:在客户端配置共享订阅

使用方式

当期望以共享订阅的方式订阅消息时,订阅 topic-filter 以如下方式配置:
$share/{ShareName}/{TopicFilter}
参数
说明
$share
协议指定的使用共享订阅时的标记,固定字符串。
{ShareName}
在控制台创建的共享订阅组的名称,不能包含“ / ”,“ + ” , “ # ”。
{TopicFilter}
客户端正常订阅时使用的 Topic Filter,与 MQTT TopicFilter 要求 和语义相同。

示例代码

package org.apache.rocketmq.mqtt.example.quickstart;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SharedSubscriptionQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://127.0.0.1:1883";
String clientId = "shared-sub-0";
try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.connect(options);
int total = 1;
CountDownLatch latch = new CountDownLatch(total);

client.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),
new String(message.getPayload()));
latch.countDown();
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete: " + token.isComplete());
}
});
// 共享订阅表达式
// {ShareName} 这里为 Group0
// {topic-filter} 这里为 home/#
String topic = "$share/Group0/home/#";

// Subscribe
client.subscribe(topic, 1);
TimeUnit.HOURS.sleep(1);
client.disconnect();
}
}
}

共享订阅离线消息保留策略

${ShareName}下如果存在有效的Session订阅${TopicFilter}, 符合${TopicFilter}的离线消息就会保留,订阅者再次上线时,会从上次进度恢复消费。

Session 有效期

MQTT 3.1、3.1.1 通过clean-session定义Session的生命周期。 当 clean-session = true, Session 的生命周期与传输层生命周期一致。 当 clean-session = false, Session 与传输层生命周期无关。为避免资源浪费,产品定义传输层断开后,Session 最大存续 3 天。


按照 MQTT 5.0 协议,有以下等价语义:
MQTT 3.1、3.1.1
MQTT 5
clean-session = true
clean-start = true
session-expiry-interval = 0
clean-session = false
clean-start = false
session-expiry-interval = 259200

查看共享订阅组

1. 登录 MQTT 控制台
2. 在左侧导航栏单击资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅管理页签,即可查看和管理当前集群的共享订阅组(ShareName)列表。
说明:
为了便于客户使用,列表页会自动展示当前集群已存在的共享订阅组的信息,即如果客户端使用了$share/{ShareName}/{topicfilter}进行共享订阅,则 ShareName 会自动出现在共享订阅组的列表,默认的负载均衡策略为随机策略。
4. 单击具体的共享订阅组名称进入查看共享订阅组的详情,如下图所示。详情页面会展示当前共享订阅的详情,当前共享订阅组(ShareName)下的订阅表达式(filter)以及点开后展示对应 filter 下的客户端和客户端状态。
说明:
在保证共享订阅消息负载的顺序性时,即负载均衡模式选择为“ 分区哈希”,客户端的在线状态会引入负载均衡时延的影响。如果客户端 session 已经不存在,只要在负载均衡的时效内,依然会展示为“在线”状态;客户端连接断开时间超出负载均衡生效时延后会展示为“离线”状态。


编辑共享订阅组

为了避免策略过渡期间出现消息重复或顺序性无法保证的问题,共享订阅组创建成功后,不支持修改共享订阅组名称和负载均衡策略。如果需要修改一个共享订阅组的负载均衡策略,可以在暂停客户端并删除共享订阅组后,重新创建并配置不同负载均衡策略的共享订阅组。
1. 登录 MQTT 控制台
2. 在左侧导航栏单击资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅管理页签,即可查看和管理当前集群的共享订阅组(ShareName)列表。
4. 在共享订阅组列表的操作列,单击编辑进行共享订阅组信息的修改。
如果负载均衡策略是“随机”,则仅能修改描述部分。
如果负载均衡策略是“分区哈希”,则可以修改描述和负载均衡生效时间。

删除共享订阅组

删除共享订阅组时,需谨慎评估对线上业务的影响。MQTT 会对共享订阅组是否存在有效订阅进行校验。为避免意外中断服务,建议确保所有相关客户端 Session 均已下线后再执行删除操作。
1. 登录 MQTT 控制台
2. 在左侧导航栏单击资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅管理页签,即可查看和管理当前集群的共享订阅组(ShareName)列表。
4. 在共享订阅组列表的操作列,单击删除,在弹窗中二次确认删除后即可完成删除。





帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈