tencent cloud

消息队列 RabbitMQ 版

动态与公告
新功能发布记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 RabbitMQ 版
产品优势
应用场景
开源托管版与 Serverless 版差异说明
开源版本支持说明
与开源 RabbitMQ 对比
高可用
使用限制
RabbitMQ 相关概念
开区地域
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
步骤1:准备工作
步骤2:创建 RabbitMQ 集群
步骤3:配置 Vhost
步骤4:使用 SDK 收发消息
步骤5:查询消息
步骤6:销毁资源
用户指南
使用流程指引
配置账号权限
创建集群
配置 Vhost
连接集群
管理消息
配置高级特性
管理集群
查看监控和配置告警
实践教程
实践教程使用说明
RabbitMQ 客户端实践教程
RabbitMQ 消息可靠性实践教程
RabbitMQ 支持 MQTT 协议说明
迁移集群
迁移方案概述
步骤1:购买云上实例
步骤2:迁移元数据上云
步骤3:开启双读写
API 参考(开源托管版)
API 概览
API 参考(Serverless 版)
History
Introduction
API Category
Making API Requests
Relevant APIs for RabbitMQ Serverless PAAS Capacity
RabbitMQ Serverless Instance Management APIs
Data Types
Error Codes
SDK 文档
SDK 概述
Spring Boot Starter 接入
Spring Cloud Stream 接入
Java SDK
Go SDK
Python SDK
PHP SDK
安全与合规
权限管理
网络安全
删除保护
变更记录
云 API 审计
常见问题
服务等级协议
联系我们

Spring Cloud Stream 接入

PDF
聚焦模式
字号
最后更新时间: 2026-01-04 15:32:34

操作场景

本文以调用 Spring Cloud Stream SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

步骤1:添加依赖

在 pom.xml 中添加Stream RabbitMQ相关依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

步骤2:准备配置

1. 在配置文件中进行相应配置 (以 direct 交换机配置为例)。
spring:
application:
name: application-name
cloud:
stream:
rabbit:
bindings:
# 输出channel名称
output:
# 生产者配置信息
producer:
# 生产者使用的交换机类型 如果已存在交换机名称,该类型必须与交换机类型一致
exchangeType: direct
# 用于指定 routing key 表达式
routing-key-expression: headers["routeTo"] # 该值表示使用头信息的routeTo字段作为 routing key
queueNameGroupOnly: true
# 输入channel名称
input:
# 消费者配置信息
consumer:
# 消费者使用的交换机类型 如果已存在交换机名称,该类型必须与交换机类型一致
exchangeType: direct
# 消费者消息队列绑定的 routing key
bindingRoutingKey: info,waring,error
# 该配置会对上面的 routing key 进行处理
bindingRoutingKeyDelimiter: "," # 该配置表示:使用,切割上面配置的routing key
# 消息确认模式 具体查看AcknowledgeMode
acknowledge-mode: manual
queueNameGroupOnly: true
bindings:
# 输出channel名称
output: #通道的名称
destination: direct_logs #要使用的exchange名称
content-type: application/json
default-binder: dev-rabbit
# 输入channel名称
input: #通道的名称
destination: direct_logs #要使用的exchange名称
content-type: application/json
default-binder: dev-rabbit
group: route_queue1 # 要使用的消息队列名称
binders:
dev-rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: amqp-xxx.rabbitmq.xxx.tencenttdmq.com #集群接入地址,在集群管理页面操作列的获取接入地址获取。
port: 5672
username: admin #角色名称
password: password #角色密钥
virtual-host: vhostnanme #Vhost名称
参数
说明
bindingRoutingKey
消费者消息队列绑定的 routing key,消息的路由规则,在控制台绑定关系列表的绑定 Key 列获取。
img


direct_log
Exchange 名称,在控制台 Exchange 列表获取。
route_queue1
Queue名称,在控制台 Queue 列表获取。
host
集群接入地址,集群接入地址,在集群基本信息页面的客户端接入模块获取。

port
集群接入地址端口,在集群基本信息页面的客户端接入模块获取。
username
用户名称,填写在控制台创建的用户名称。
password
用户密码,填写在控制台创建用户时填写的密码。
virtual-host
Vhost 名称,在控制台 Vhost 列表获取。
2. 创建配置文件加载程序。
OutputMessageBinding.java
public interface OutputMessageBinding {
/**
* 要使用的通道名称(输出channel名称)
*/
String OUTPUT = "output";

@Output(OUTPUT)
MessageChannel output();
}
InputMessageBinding.java
public interface InputMessageBinding {

/**
* 要使用的通道名称
*/
String INPUT = "input";

@Input(INPUT)
SubscribableChannel input();
}

步骤3:发送消息

创建并编译消息发送程序 IMessageSendProvider.java。
// 引入配置类
@EnableBinding(OutputMessageBinding.class)
public class MessageSendProvider {

@Autowired
private OutputMessageBinding outputMessageBinding;

public String sendToDirect() {
outputMessageBinding.output().send(MessageBuilder.withPayload("[info] This is a new message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "info").build());
outputMessageBinding.output().send(MessageBuilder.withPayload("[waring] This is a new waring message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "waring").build());
outputMessageBinding.output().send(MessageBuilder.withPayload("[error] This is a new error message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "error").build());
return "success";
}

public String sendToFanout() {
for (int i = 0; i < 3; i++) {
outputMessageBinding.output().send(MessageBuilder.withPayload("This is a new message" + i).build());
}
return "success";
}
}
在要发送消息的类中注入MessageSendProvider 即可进行发送消息。

步骤4:消费消息

创建并编译消息消费程序 MessageConsumer.java。可配置多个通道,可对不同消息队列的监听。
@Service
@EnableBinding(InputMessageBinding.class)
public class MessageConsumer {

@StreamListener(InputMessageBinding.INPUT)
public void test(Message<String> message) throws IOException {
Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
String payload = message.getPayload();
System.out.println(payload);
}
}

步骤5:查看消息

如果您想确认消息是否成功发送至 消息队列 RabbitMQ 版,可以在控制台 集群列表 > Queue 基本信息页面查看接入的消费者情况。

说明:
上述是基于 RabbitMQ 的发布订阅模型的一个简单示例,可根据实际使用进行不同配置,具体可参见 Demo 示例Spring cloud stream 官网

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈