新功能发布记录
公告
<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限);// 设置NameServer的地址producer.setNamesrvAddr(nameserver);// 启动Producer实例producer.start();
参数 | 说明 |
groupName | 生产者组名称,建议使用对应的 topic 名字 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制 AccessKey。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制 SecretKey。 |
nameserver | 集群接入地址,控制台集群基本信息页面的接入信息模块获取。 ![]() |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC_NAME, ("Hello scheduled message " + i).getBytes());// 设置消息延迟等级message.setDelayTimeLevel(5);// 发送消息SendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
int totalMessagesToSend = 1;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC_NAME, ("Hello timer message " + i).getBytes());// 设置发送消息的时间long timeStamp = System.currentTimeMillis() + 30000;// 若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2022-08-08 08:08:08投递。// 若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。//将 __STARTDELIVERTIME 设定到 msg 的属性中message.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));// 发送消息SendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
// 实例化消费者DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);
参数 | 说明 |
groupName | Group 的名称,在控制台 Group 管理页面复制 Group 名称。 4.x虚拟集群/专享集群:此处需拼接命名空间名称,格式为 namespace全称%group名称,例如 MQ_INSTxxx_aaa%GroupTest。4.x通用集群/5.x集群:此处无需拼接,填写 Group 名称即可。 ![]() |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制 AccessKey。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制 SecretKey。 |
nameserver | 集群接入地址,控制台集群基本信息页面的接入信息模块获取。 ![]() |
// 订阅topicpushConsumer.subscribe(topic_name, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
参数 | 说明 |
topic_name | Topic 的名称,在控制台 Topic 管理页面复制 Topic 名称。 4.x虚拟集群/专享集群:此处需拼接命名空间名称,格式为 namespace全称%topic名称,例如 MQ_INSTxxx_aaa%TopicTest。4.x通用集群/5.x集群:此处无需拼接,填写 Topic 名称即可。 ![]() |
"*" | 订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。 |

文档反馈