新功能发布记录

参数项 | 说明 | 示例值 |
共享订阅组名称 | 设置共享订阅组名称,需符合命名规则:不能为空,1-64个字符,支持字母、数字。 | order_processing_group |
负载均衡策略 | ○ 随机(默认策略,即 random):在不同的订阅客户端间随机分发负载。 ○ 分区哈希:在一定程度上保证消息负载时的消息顺序性。选择该模式后,需要另外填写负载均衡生效的时间。即新的消费者客户端加入后,要在经过指定时延后,才会加入到负载均衡策略中。(当前默认实现 Topic-hash,即共享订阅中优先保证同一Topic下的消息顺序,若您需要 ClientID-hash请提交工单联系我们) | 随机 |
描述 | 填写共享订阅组的说明信息,选填 | 订单处理多客户端负载 |

$share/{ShareName}/{TopicFilter}
参数 | 说明 |
$share | 协议指定的使用共享订阅时的标记,固定字符串。 |
{ShareName} | 在控制台创建的共享订阅组的名称,不能包含“ / ”,“ + ” , “ # ”。 |
{TopicFilter} |
package org.apache.rocketmq.mqtt.example.quickstart;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class SharedSubscriptionQuickStart {public static void main(String[] args) throws MqttException, InterruptedException {String serverUri = "tcp://127.0.0.1:1883";String clientId = "shared-sub-0";try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {client.setTimeToWait(3000);MqttConnectOptions options = new MqttConnectOptions();options.setUserName("YOUR-USERNAME");options.setPassword("YOUR-PASSWORD".toCharArray());options.setCleanSession(true);options.setAutomaticReconnect(true);client.connect(options);int total = 1;CountDownLatch latch = new CountDownLatch(total);client.setCallback(new MqttCallback() {public void messageArrived(String topic, MqttMessage message) {System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),new String(message.getPayload()));latch.countDown();}public void connectionLost(Throwable cause) {System.out.println("connectionLost: " + cause.getMessage());}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete: " + token.isComplete());}});// 共享订阅表达式// {ShareName} 这里为 Group0// {topic-filter} 这里为 home/#String topic = "$share/Group0/home/#";// Subscribeclient.subscribe(topic, 1);TimeUnit.HOURS.sleep(1);client.disconnect();}}}

MQTT 3.1、3.1.1 | MQTT 5 | |
clean-session = true | clean-start = true | session-expiry-interval = 0 |
clean-session = false | clean-start = false | session-expiry-interval = 259200 |
$share/{ShareName}/{topicfilter}进行共享订阅,则 ShareName 会自动出现在共享订阅组的列表,默认的负载均衡策略为随机策略。

文档反馈