tencent cloud

消息队列 RocketMQ 版

动态与公告
新功能发布记录
公告
产品简介
产品概述
什么是消息队列 RocketMQ 版
产品优势
应用场景
产品系列
开源对比
高可用
使用限制
开服地域
基本概念
产品计费
计费概述
价格说明
计费示例
切换集群计费模式(5.x)
续费说明
查看消费明细
退费说明
欠费说明
快速入门
快速入门概述
准备工作
步骤1:创建 RocketMQ 资源
步骤2:使用 SDK 收发消息(推荐)
步骤2:运行 RocketMQ 客户端(可选)
步骤3:查询消息
步骤4:销毁资源
用户指南
使用流程指引
配置账号权限
新建集群
命名空间管理
配置 Topic
配置 Group
连接集群
管理消息
管理集群
查看监控和配置告警
跨集群复制消息
实践教程
RocketMQ 常见概念命名规范
RocketMQ 客户端实践
RocketMQ 性能压测和容量评估
使用社区版 HTTP SDK 接入
客户端风险说明和更新指南
关于 RocketMQ 4.x 集群角色(Role)相关云 API 迁移指引
迁移指南
有感迁移
无感迁移
开发指南
消息类型
消息过滤
消息重试
POP 消费模式(5.x)
集群消费与广播消费
订阅关系一致性
限流
API 参考(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
API 参考(4.x)
SDK 参考
SDK 概述
5.x SDK
4.x SDK
安全与合规
权限管理
云 API 审计
删除保护
常见问题
4.x 实例常见问题
服务协议
服务等级协议
联系我们

Go SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-23 17:07:05

操作场景

本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

1. 在客户端环境执行如下命令下载 RocketMQ 客户端相关的依赖包。请确保下载的版本高于 v2.1.2.rc2。
go get github.com/apache/rocketmq-client-go/v2
2. 在对应的方法内创建生产者,如您需要发送普通消息,则在 syncSendMessage.go 文件内修改对应的参数。
延时消息目前支持任意精度的延时,且不受 delay level 的影响。
普通消息
延时消息
// 服务接入地址 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)
var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
// 授权角色名
var secretKey = "admin"
// 授权角色密钥
var accessKey = "eyJrZXlJZC...."
// 命名空间全称
var nameSpace = "MQ_INST_rocketmqem4xxxx"
// 生产者组名称
var groupName = "group1"
// 创建消息生产者
p, _ := rocketmq.NewProducer(
// 设置服务地址
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
// 设置acl权限
producer.WithCredentials(primitive.Credentials{
SecretKey: secretKey,
AccessKey: accessKey,
}),
// 设置生产组
producer.WithGroupName(groupName),
// 设置命名空间名称
producer.WithNamespace(nameSpace),
// 设置发送失败重试次数
producer.WithRetry(2),
)
// 启动producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}

// topic名称
var topicName = "topic1"
// 生产者组名称
var groupName = "group1"
// 创建消息生产者
p, _ := rocketmq.NewProducer(
// 设置服务地址
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),
// 设置acl权限
producer.WithCredentials(primitive.Credentials{
SecretKey: "admin",
AccessKey: "eyJrZXlJZC......",
}),
// 设置生产组
producer.WithGroupName(groupName),
// 设置命名空间名称
producer.WithNamespace("rocketmq-xxx|namespace_go"),
// 设置发送失败重试次数
producer.WithRetry(2),
)
// 启动producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 1; i++ {
msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))
// 设置延迟等级
// 等级与时间对应关系:
// 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
//如果想用延迟级别,那么设置下面这个方法

msg.WithDelayTimeLevel(3)
//如果想用任意延迟消息,那么设置下面这个方法,WithDelayTimeLevel 就不要设置了,单位为具体的毫秒,如下则是10s后投递
delayMills := int64(10 * 1000)
msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().UnixMilli()+delayMills, 10))
// 发送消息
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\\n", err)
} else {
fmt.Printf("send message success: result=%s\\n", res.String())
}
}

// 释放资源
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

nameSpace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处置空即可。

serverAddress
集群接入地址,控制台集群基本信息页面的接入信息模块获取。 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)。

groupName
生产者组名称,在控制台 Group 管理页面复制。

3. 发送消息同上(以同步发送方式为例)。
// topic名称
var topicName = "topic1"
// 构造消息内容
msg := &primitive.Message{
Topic: topicName, // 设置topic名称
Body: []byte("Hello RocketMQ Go Client! This is a new message."),
}
// 设置tag
msg.WithTag("TAG")
// 设置key
msg.WithKeys([]string{"yourKey"})
// 发送消息
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\\n", err)
} else {
fmt.Printf("send message success: result=%s\\n", res.String())
}
参数
说明
topicName
Topic 的名称,在控制台 Topic 管理页面复制。

TAG
消息 TAG 标识。
yourKey
消息业务 key。
资源释放。
// 关闭生产者
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
说明:
异步发送、单向发送等,可参见 Demo 示例 或参见 rocketmq-client-go 示例
4. 创建消费者。
// 服务接入地址 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)
var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
// 授权角色名
var secretKey = "admin"
// 授权角色密钥
var accessKey = "eyJrZXlJZC...."
// 命名空间全称
var nameSpace = "rocketmq-xxx|namespace_go"
// 生产者组名称
var groupName = "group11"
// 创建consumer
c, err := rocketmq.NewPushConsumer(
// 设置消费者组
consumer.WithGroupName(groupName),
// 设置服务地址
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
// 设置acl权限
consumer.WithCredentials(primitive.Credentials{
SecretKey: secretKey,
AccessKey: accessKey,
}),
// 设置命名空间名称
consumer.WithNamespace(nameSpace),
// 设置从起始位置开始消费
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
// 设置消费模式(默认集群模式)
consumer.WithConsumerModel(consumer.Clustering),
//广播消费,设置一下实例名,设置为应用的系统名即可。如果不设置,会使用pid,这会导致重启消费重复
consumer.WithInstance("xxxx"),
)
if err != nil {
fmt.Println("init consumer2 error: " + err.Error())
os.Exit(0)
}
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

nameSpace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。


serverAddress
集群接入地址,控制台集群基本信息页面的接入信息模块获取。 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)。

groupName
生产者组名称,在控制台 Group 管理页面复制。

5. 消费消息。
// topic名称
var topicName = "topic1"
// 设置订阅消息的tag
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expression: "TagA || TagC",
}
// 设置重新消费的延迟级别,共支持18种延迟级别。下面是延迟级别与延迟时间的对应关系
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
delayLevel := 1
err = c.Subscribe(topicName, selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback len: %d \\n", len(msgs))
// 设置下次消费的延迟级别
concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater

for _, msg := range msgs {
// 模拟重试3次后消费成功
if msg.ReconsumeTimes > 3 {
fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
return consumer.ConsumeSuccess, nil
} else {
fmt.Printf("subscribe callback: %v \\n", msg)
}
}
// 模拟消费失败,回复重试
return consumer.ConsumeRetryLater, nil
})
if err != nil {
fmt.Println(err.Error())
}
参数
说明
topicName
Topic 的名称,在控制台 Topic 页面复制。
Expression
消息 TAG 标识。
delayLevel
设置重新消费的延迟级别,共支持18种延迟级别。
6. 消费消息 (消费者消费消息必须在订阅之后)。
// 开始消费
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
// 资源释放
err = c.Shutdown()
if err != nil {
fmt.Printf("shundown Consumer error: %s", err.Error())
}
7. 查看消费详情。消息发送完成后得到一个消息 ID (messageID),您可以在控制台的消息查询 > 综合查询页面查询刚刚发送的消息,以及该消息的详情和轨迹等信息。

说明:
本文简单介绍了使用 Go 客户端进行简单的收发消息,更多操作可以下载 Demo,或者可参见 rocketmq-client-go 示例

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈