tencent cloud

消息队列 RocketMQ 版

动态与公告
新功能发布记录
公告
产品简介
产品概述
什么是消息队列 RocketMQ 版
产品优势
应用场景
产品系列
开源对比
高可用
使用限制
开服地域
基本概念
产品计费
计费概述
价格说明
计费示例
切换集群计费模式(5.x)
续费说明
查看消费明细
退费说明
欠费说明
快速入门
快速入门概述
准备工作
步骤1:创建 RocketMQ 资源
步骤2:使用 SDK 收发消息(推荐)
步骤2:运行 RocketMQ 客户端(可选)
步骤3:查询消息
步骤4:销毁资源
用户指南
使用流程指引
配置账号权限
新建集群
命名空间管理
配置 Topic
配置 Group
连接集群
管理消息
管理集群
查看监控和配置告警
跨集群复制消息
实践教程
RocketMQ 常见概念命名规范
RocketMQ 客户端实践
RocketMQ 性能压测和容量评估
使用社区版 HTTP SDK 接入
客户端风险说明和更新指南
关于 RocketMQ 4.x 集群角色(Role)相关云 API 迁移指引
迁移指南
有感迁移
无感迁移
开发指南
消息类型
消息过滤
消息重试
POP 消费模式(5.x)
集群消费与广播消费
订阅关系一致性
限流
API 参考(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
API 参考(4.x)
SDK 参考
SDK 概述
5.x SDK
4.x SDK
安全与合规
权限管理
云 API 审计
删除保护
常见问题
4.x 实例常见问题
服务协议
服务等级协议
联系我们

4.x 集群使用 4.x SDK 收发普通消息

PDF
聚焦模式
字号
最后更新时间: 2026-01-23 16:46:08

操作场景

消息队列 RocketMQ 版支持多种语言的 SDK 收发不同类型的消息,本文以调用 Java SDK 为例介绍通过 4.x SDK 连接消息队列 RocketMQ 版服务端实现普通消息收发的操作过程,

前提条件

已完成前期的 RocketMQ 集群资源创建。
已参考准备工作完成 Linux 服务器准备和环境配置。

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
说明:
依赖版本要求 ≥ 4.9.3。
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.3</version>
</dependency>

步骤2:生产消息

1. 创建消息生产者

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(
namespace,
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))
// ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();
参数
说明
namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是 4.x 通用集群,此处填写集群 ID 即可。



groupName
生产者组名称,在控制台集群管理中 Group 页签中复制。
nameserver
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的角色管理页面 SecretKey 列复制。

accessKey
角色密钥,在控制台的角色管理页面 AccessKey 列复制。

2. 发送消息

发送消息有多种方式:同步发送、异步发送、单向发送等。
同步发送
异步发送
单向发送
for (int i = 0; i < 10; i++) {
// 创建消息实例,设置topic和消息内容
Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
参数
说明
topic_name
Topic 的名称,在控制台 Topic 管理页面复制。

TAG
用来设置消息的 TAG。
// 设置发送失败后不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
// 设置发送消息的数量
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
// 创建消息实体,设置topic和消息内容
Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功逻辑
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
// 消息发送失败逻辑
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
参数
说明
topic_name
Topic 的名称,在控制台 Topic 管理页面复制。

TAG
用来设置消息的 TAG。
for (int i = 0; i < 10; i++) {
// 创建消息实例,设置topic和消息内容
Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送单向消息
producer.sendOneway(msg);
}
参数
说明
topic_name
Topic 的名称,在控制台 Topic 管理页面复制。

TAG
用来设置消息的 TAG。
说明:
批量发送及其他情况可参见 Demo社区文档

步骤3:消费消息

1. 创建消费者

TDMQ RocketMQ 版支持 push 和 pull 两种消费模式。
push 消费者
pull 消费者
// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
namespace,
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
// 设置NameServer的地址
pushConsumer.setNamesrvAddr(nameserver);
参数
说明
namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是 4.x 通用集群,此处填写集群 ID 即可。



groupName
生产者组名称,在控制台集群管理中 Group 页签中复制。
nameserver
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的角色管理页面 SecretKey 列复制。

accessKey
角色密钥,在控制台的角色管理页面 AccessKey 列复制。
// 实例化消费者
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(
namespace,
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
// 设置NameServer的地址
pullConsumer.setNamesrvAddr(nameserver);
// 设置从第一个偏移量开始消费
pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
参数
说明
namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是 4.x 通用集群,此处填写集群 ID 即可。



groupName
生产者组名称,在控制台集群管理中 Group 页签中复制。
nameserver
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的角色管理页面 SecretKey 列复制。

accessKey
角色密钥,在控制台的角色管理页面 AccessKey 列复制。
说明:
更多消费类型可参见 DemoRocketMQ 官方文档

2. 订阅消息

根据消费模式不同,订阅方式也有所区别。
push 订阅
pull 订阅
// 订阅topic
pushConsumer.subscribe(topic_name, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 消息处理逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费, 根据消费情况,返回处理状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();
参数
说明
topic_name
Topic 的名称,在控制台 Topic 管理页面复制。

"*"
订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。
// 订阅topic
pullConsumer.subscribe(topic_name, "*");
// 启动消费者实例
pullConsumer.start();
try {
System.out.printf("Consumer Started.%n");
while (true) {
// 拉取消息
List<MessageExt> messageExts = pullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
pullConsumer.shutdown();
}
参数
说明
topic_name
Topic 的名称,在控制台 Topic 管理页面复制。

"*"
订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈