新功能发布记录

通配符 | 说明 | 示例 |
+ | 单层通配符,代表匹配一个且仅一个任意主题层级 | 订阅 sensor/+/temperature匹配 sensor/bedroom/temperature匹配 sensor/living_room/temperature不匹配 sensor/bedroom/upstairs/temperature (因为+只能匹配一层)不匹配 sensor/temperature (因为 sensor 和 temperature 之间缺少一层) |
# | 多层通配符,代表匹配零个或多个任意主题层级,必须是主题过滤器的最后一个字符 | 订阅 sensor/bedroom/#匹配 sensor/bedroom/temperature匹配 sensor/bedroom/humidity匹配 sensor/bedroom/light/intensity匹配 sensor/bedroom (# 可以匹配零层)不匹配 sensor/living_room/temperature (因为 bedroom 是固定的) |
QoS 等级 | 工作流程 | 优点 | 缺点 | 适用场景 |
QoS = 0 (最多分发一次) | 发送者将消息发送出去,不要求接收方进行确认 | 传输速度最快,开销最小 | 消息可能会丢失,例如网络故障、接收方不在线情况 | 非关键性的数据,允许偶尔丢失,例如周期性的传感器数据(温度、湿度),丢失下一个数据点很快会补上。 |
QoS = 1 (至少达到一次) | 1. 发送者发送消息并保留一个副本。 2. 接收者收到消息后,必须回复一个 PUBACK (发布确认) 报文。 3. 发送者收到 PUBACK 后,才会丢弃消息副本。 4. 如果发送者在合理时间内未收到 PUBACK,它会重新发送该消息。 | 保证了消息不会丢失 | 可能导致消息重复 | 需要保证消息必达,但可以接受偶尔重复的场景,例如控制指令(“开关灯”),重复执行一次可能也没问题。 |
QoS = 2 (仅到达一次) | 1. PUBLISH:发送者发送消息,并保留副本。 2. PUBREC:接收者收到后回复一个“已收到”确认。如果发送者没收到 PUBREC,会重发 PUBLISH。 3. PUBREL:发送者收到 PUBREC 后,发送一个“发布释放”报文,并可以丢弃消息副本。它现在只需要等待最终确认。 4. PUBCOMP:接收者收到 PUBREL 后,回复一个“发布完成”确认。之后才将消息交付给应用。发送者收到 PUBCOMP,流程结束。如果任何一方没收到应答,都会重发上一条报文。 | 既保证了消息不丢失,又确保了不会重复 | 速度最慢,开销最大 | 对可靠性和准确性要求极高的关键业务,例如计费系统、金融交易、关键状态同步,任何重复或丢失都会造成严重后果。 |
retained参数等于true,Broker 会为这个 Topic 保存最新的这条保留消息。sensor/temperature 后,能立刻收到当前的最新温度,而不必等待下一个小时。package com.tencent.tdmq.mqtt.example.paho.v5;import java.nio.ByteBuffer;import java.nio.charset.StandardCharsets;import java.util.List;import java.util.concurrent.TimeUnit;import org.eclipse.paho.mqttv5.client.IMqttToken;import org.eclipse.paho.mqttv5.client.MqttCallback;import org.eclipse.paho.mqttv5.client.MqttClient;import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;import org.eclipse.paho.mqttv5.common.MqttException;import org.eclipse.paho.mqttv5.common.MqttMessage;import org.eclipse.paho.mqttv5.common.packet.MqttProperties;import org.eclipse.paho.mqttv5.common.packet.UserProperty;public class SubscriberQuickStart {public static void main(String[] args) throws MqttException, InterruptedException {// 从MQTT控制台获取接入点:// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059String clientId = "SubscriberQuickStart";// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码String username = "user0";String password = "secret0";// MQTT topic filtersString[] topicFilters = new String[]{"home/test", "home/#", "home/+"};int[] qos = new int[]{1, 1, 1};MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());client.setTimeToWait(3000);MqttConnectionOptions options = new MqttConnectionOptions();options.setUserName(username);options.setPassword(password.getBytes(StandardCharsets.UTF_8));options.setCleanStart(true);options.setAutomaticReconnect(true);client.setCallback(new MqttCallback() {@Overridepublic void disconnected(MqttDisconnectResponse response) {System.out.println("Disconnected: " + response.getReasonString());}@Overridepublic void mqttErrorOccurred(MqttException e) {e.printStackTrace();}@Overridepublic void messageArrived(String topic, MqttMessage message) {byte[] payload = message.getPayload();String content;if (4 == payload.length) {ByteBuffer buf = ByteBuffer.wrap(payload);content = String.valueOf(buf.getInt());} else {content = new String(payload, StandardCharsets.UTF_8);}System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",topic, message.getQos(), content);List<UserProperty> userProperties = message.getProperties().getUserProperties();printUserProperties(userProperties);}@Overridepublic void deliveryComplete(IMqttToken token) {System.out.println("Delivery complete for packet-id: " + token.getMessageId());}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);try {// SubscribeIMqttToken token = client.subscribe(topicFilters, qos);int[] reasonCodes = token.getReasonCodes();for (int i = 0; i < reasonCodes.length; i++) {System.out.printf("Subscribed to topic %s with QoS=%d, Granted-QoS: %d%n",topicFilters[i], qos[i], reasonCodes[i]);}if (token.isComplete()) {List<UserProperty> userProperties = token.getResponseProperties().getUserProperties();printUserProperties(userProperties);}} catch (MqttException e) {e.printStackTrace();}}@Overridepublic void authPacketArrived(int i, MqttProperties properties) {System.out.println("Received auth packet with id: " + i);}});client.connect(options);TimeUnit.MINUTES.sleep(5);client.disconnect();client.close();}static void printUserProperties(List<UserProperty> userProperties) {if (null != userProperties) {for (UserProperty userProperty : userProperties) {System.out.printf("User property: %s = %s%n", userProperty.getKey(), userProperty.getValue());}}}}
文档反馈