tencent cloud

消息队列 RocketMQ 版

动态与公告
新功能发布记录
公告
产品简介
产品概述
什么是消息队列 RocketMQ 版
产品优势
应用场景
产品系列
开源对比
高可用
使用限制
开服地域
基本概念
产品计费
计费概述
价格说明
计费示例
切换集群计费模式(5.x)
续费说明
查看消费明细
退费说明
欠费说明
快速入门
快速入门概述
准备工作
步骤1:创建 RocketMQ 资源
步骤2:使用 SDK 收发消息(推荐)
步骤2:运行 RocketMQ 客户端(可选)
步骤3:查询消息
步骤4:销毁资源
用户指南
使用流程指引
配置账号权限
新建集群
命名空间管理
配置 Topic
配置 Group
连接集群
管理消息
管理集群
查看监控和配置告警
跨集群复制消息
实践教程
RocketMQ 常见概念命名规范
RocketMQ 客户端实践
RocketMQ 性能压测和容量评估
使用社区版 HTTP SDK 接入
客户端风险说明和更新指南
关于 RocketMQ 4.x 集群角色(Role)相关云 API 迁移指引
迁移指南
有感迁移
无感迁移
开发指南
消息类型
消息过滤
消息重试
POP 消费模式(5.x)
集群消费与广播消费
订阅关系一致性
限流
API 参考(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
API 参考(4.x)
SDK 参考
SDK 概述
5.x SDK
4.x SDK
安全与合规
权限管理
云 API 审计
删除保护
常见问题
4.x 实例常见问题
服务协议
服务等级协议
联系我们

Spring Cloud Stream 接入

PDF
聚焦模式
字号
最后更新时间: 2026-01-23 17:07:04

操作场景

本文以调用 Spring Cloud Stream 接入为例介绍实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

步骤1:引入依赖

在 pom.xml 中引入 spring-cloud-starter-stream-rocketmq 相关依赖。当前建议版本 2021.0.4.0。
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.0.4.0</version>
</dependency>

步骤2:添加配置

在配置文件中增加 RocketMQ 相关配置。
spring:
cloud:
stream:
rocketmq:
binder:
# 服务地址全称
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# 角色名称
secret-key: admin
# 角色密钥
access-key: eyJrZXlJZ...
# namespace全称
namespace: rocketmq-xxx|namespace1
# producer group
group: producerGroup
bindings:
# channel名称, 与spring.cloud.stream.bindings下的channel名称对应
Topic-TAG1-Input:
consumer:
# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
subscription: TAG1
# channel名称
Topic-TAG2-Input:
consumer:
subscription: TAG2
bindings:
# channel名称
Topic-send-Output:
# 指定topic, 对应创建的topic名称
destination: TopicTest
content-type: application/json
# channel名称
Topic-TAG1-Input:
destination: TopicTest
content-type: application/json
group: consumer-group1
# channel名称
Topic-TAG2-Input:
destination: TopicTest
content-type: application/json
group: consumer-group2
注意
1. 目前只有 2.2.5-RocketMQ-RC12.2.5.RocketMQ.RC2 及以上版本支持 namespace 配置,如使用别的版本需要对 topic 和 group 名称进行拼接。
格式如下:
rocketmq-pngrpmk94d5o|stream%topic (格式为:namespace全称%topic名称)
rocketmq-pngrpmk94d5o|stream%group (格式为:namespace全称%group名称)
新的虚拟集群与专享集群格式如下:
MQ_INST_rocketmqpj79obd2ew7v_test%topic (格式为:namespace全称%topic名称)
MQ_INST_rocketmqpj79obd2ew7v_test%group (格式为:namespace全称%group名称)
2. 配置方面 2.2.5-RocketMQ-RC12.2.5.RocketMQ.RC2 的订阅配置项为 subscription , 其他低版本订阅配置项为 tags
其他版本完整配置项参考如下:
spring:
cloud:
stream:
rocketmq:
bindings:
# channel名称, 与spring.cloud.stream.bindings下的channel名称对应
Topic-test1:
consumer:
# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
tags: TAG1
# channel名称
Topic-test2:
consumer:
tags: TAG2
binder:
# 服务地址全称
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# 角色名称
secret-key: admin
# 角色密钥
access-key: eyJrZXlJZ...
bindings:
# channel名称
Topic-send:
# 指定topic, 对应创建的topic全称,格式为:集群id|namespace名称%topic名称
destination: rocketmq-xxx|stream%topic1
content-type: application/json
# 要使用group全称称,格式为:集群id|namespace名称%group名称
group: rocketmq-xxx|stream%group1
# channel名称
Topic-test1:
destination: rocketmq-xxx|stream%topic1
content-type: application/json
group: rocketmq-xxx|stream%group1
# channel名称
Topic-test2:
destination: rocketmq-xxx|stream%topic1
content-type: application/json
group: rocketmq-xxx|stream%group2
参数
说明
name-server
集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
secret-key
角色名称,在控制台的集群权限页面 SecretKey 列复制。
access-key
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。

group
生产者组名称,在控制台 Group 管理页面复制。

destination
Topic 的名称,在控制台 Topic 管理页面复制。


步骤3:配置 channel

channel 分为输入和输出,可根据自己的业务进行单独配置。
/**
* 自定义通道 Binder
*/
public interface CustomChannelBinder {

/**
* 发送消息(消息生产者)
* 绑定配置中的channel名称
*/
@Output("Topic-send-Output")
MessageChannel sendChannel();


/**
* 接收消息1(消费者1)
* 绑定配置中的channel名称
*/
@Input("Topic-TAG1-Input")
MessageChannel testInputChannel1();

/**
* 接收消息2(消费者2)
* 绑定配置中的channel名称
*/
@Input("Topic-TAG2-Input")
MessageChannel testInputChannel2();
}


步骤4:添加注解

在配置类或启动类上添加相应注解,如果有多个 binder 配置,都要在此注解中进行指定。
@EnableBinding({CustomChannelBinder.class})

步骤5:发送消息

1. 在要发送消息的类中,注入 CustomChannelBinder
@Autowired
private CustomChannelBinder channelBinder;
2. 发送消息,调用对应的输出流 channel 进行消息发送。
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
channelBinder.sendChannel().send(message);

步骤6:消费消息

@Service
public class StreamConsumer {
private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);

/**
* 监听channel (配置中的channel 名称)
*
* @param messageBody 消息内容
*/
@StreamListener("Topic-TAG1-Input")
public void receive(String messageBody) {
logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);
}

/**
* 监听channel (配置中的channel 名称)
*
* @param messageBody 消息内容
*/
@StreamListener("Topic-TAG2-Input")
public void receive2(String messageBody) {
logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);
}
}

步骤7:本地测试

本地启动项目之后,可以从控制台看到启动成功。
浏览器访问 http://localhost:8080/test-simple可以看到发送成功。观察开发 IDE 的输出日志。
2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: 通过stream发送消息,messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]
2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: 通过stream收到消息,messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}

可以看到。发送了一条 TAG1 的消息,同时也只有 TAG1 的订阅者收到了消息。
说明
具体使用可参见 DemoSpring Cloud Stream 官网


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈