tencent cloud

消息队列 RabbitMQ 版

动态与公告
新功能发布记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 RabbitMQ 版
产品优势
应用场景
开源托管版与 Serverless 版差异说明
开源版本支持说明
与开源 RabbitMQ 对比
高可用
使用限制
RabbitMQ 相关概念
开区地域
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
步骤1:准备工作
步骤2:创建 RabbitMQ 集群
步骤3:配置 Vhost
步骤4:使用 SDK 收发消息
步骤5:查询消息
步骤6:销毁资源
用户指南
使用流程指引
配置账号权限
创建集群
配置 Vhost
连接集群
管理消息
配置高级特性
管理集群
查看监控和配置告警
实践教程
实践教程使用说明
RabbitMQ 客户端实践教程
RabbitMQ 消息可靠性实践教程
RabbitMQ 支持 MQTT 协议说明
迁移集群
迁移方案概述
步骤1:购买云上实例
步骤2:迁移元数据上云
步骤3:开启双读写
API 参考(开源托管版)
API 概览
API 参考(Serverless 版)
History
Introduction
API Category
Making API Requests
Relevant APIs for RabbitMQ Serverless PAAS Capacity
RabbitMQ Serverless Instance Management APIs
Data Types
Error Codes
SDK 文档
SDK 概述
Spring Boot Starter 接入
Spring Cloud Stream 接入
Java SDK
Go SDK
Python SDK
PHP SDK
安全与合规
权限管理
网络安全
删除保护
变更记录
云 API 审计
常见问题
服务等级协议
联系我们

配置延时消息

PDF
聚焦模式
字号
最后更新时间: 2026-01-04 15:30:30
本文主要介绍消息队列 TDMQ RabbitMQ 版中延迟消息的概念、使用场景和实现方式。

名词解释

延时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟一段时间后再被消费,这类消息统称为延时消息。

使用场景

场景 1:对于消息的生产消费时间有要求的场景。例如在电商系统中,若用户下单后 30 分钟不支付,自动取消订单。
场景 2:通过消息触发延时任务的场景。用户登录 App 浏览特定商品 20 分钟后还没下单,自动推送商品评测信息的消息并发放商品相关优惠券。

实现方式

托管版集群
Serverless 版集群

约束与限制

下方实现方式二中,开源托管版集群的延时消息能力需要依赖 rabbitmq_delayed_message_exchange 插件,请先在插件管理页面打开插件,具体请见管理插件
延时消息的时间值必须为非负整数,单位为毫秒。
如果您的延时消息设置了消息存活时间,则 延时消息的实际存活时间 = min{消息存活时间, Queue存活时间} + 延时时间。

使用方式

方式一:通过设置消息的过期时间和死信队列实现延时消息

原理概述
发送端通过设置消息过期时间,触发过期未消费的消息被投递到死信交换机下的死信队列。
消费端通过消费死信队列中的消息,实现延时消息的消费。
使用示例
代码仅示例使用,交换机类型、routingKey 等参数请结合实际场景替换为业务预期的值。
发送端代码示例如下,消费端订阅死信队列即可。
// 声明死信交换机
channel.exchangeDeclare("${dlxExchangeName}", "direct", true);
// 声明用于发送延时消息的交换机
channel.exchangeDeclare("${delayExchangeName}", "direct", true);
// 声明用于发送延时消息的队列,并指定其死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "${dlxExchangeName}");
channel.queueDeclare("${delayQueueName}", true, false, false, args);
channel.queueBind("${delayQueueName}", "${delayExchangeName}", "");

// 声明死信队列
channel.queueDeclare("${delayQueueName}", true, false, false, null);
channel.queueBind("${dlxQueueName}", "${dlxExchangeName}", "");

// 发送延时消息
int delayInSeconds = 10; // 消息延时 10s
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayInSeconds * 1000));
channel.basicPublish("${delayExchangeName}", "", props.build(), "delayed payload".getBytes());
参数说明如下
参数
说明
${dlxExchangeName}
用于延迟消息的死信交换机名称,请替换为可在控制台 Exchange 列表查询到的名称。
${delayExchangeName}
实际发送延时消息的交换机名称,请替换为可在控制台 Exchange 列表查询到的名称。
x-dead-letter-exchange
队列参数 key,用于设置其对应的死信交换机。
${dlxQueueName}
用于延迟消息的死信队列名称
${delayQueueName}
实际发送延时消息的队列名称

方式二:通过内置的 rabbitmq_delayed_message_exchange 插件实现延时消息

使用限制
说明:
更多详细信息请参考 RabbitMQ 官方插件使用限制说明
当前插件的设计不适用于大量延迟消息(未调度的消息达数十万甚至数百万条)的场景,生产环境请谨慎评估消息量级,避免非预期的长时间延迟、消息丢失等问题。
延时消息在每个节点上只有一个持久化副本,如果节点无法正常运行(例如由于消息堆积导致持续 OOM 后重启且无法恢复),则该节点上的延时消息无法被消费端消费。
延时交换机不支持设置 mandatory ,生产者无法通过 basic.return 事件感知到无法路由的消息,因此发送延时消息前请务必保证对应的交换机、队列、路由关系存在。
综上所述,我们强烈建议您不要使用这一插件,转而使用死信队列来间接实现延时消息。如果您在了解了这个插件的若干缺陷后仍然要使用,那我们强烈推荐您将延迟消息数量保持在一个尽可能低的水平,避免触发内存负载高的问题。
使用示例
TDMQ RabbitMQ 版延时消息的使用方式和 RabbitMQ 官方支持的延时插件的使用方式完全一致,方便业务进行无改造迁移。收发消息示例可以参考使用 SDK 收发消息,延时消息的相关改造可以参考下方示例:
1. 声明 Exchange 并指定 Exchange 的路由类型。
// 声明延时交换机
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("${delayedExchangeName}", "x-delayed-message", true, false, args);
参数说明如下:
参数
说明
x-delayed-type
Exchange 的类型,指定路由规则。取值说明如下:
direct
fanout
topic
具体说明请参见 Exchange
${delayedExchangeName}
Exchange 的名称,请替换为可在控制台 Exchange 列表查询到的名称。
x-delayed-message
指定 Exchange 类型,以支持投递延时消息。
2. 发送延时消息。在消息的 Header 属性中增加一个键为 x-delay,值为毫秒数的键值对,并且指定发送的目标 Exchange 为上一步已声明的 Exchange。
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 4000); // 消息延时 4s
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("${delayedExchangeName}", "", props.build(), messageBodyBytes);
当消息到达 Exchange 后,会在4000毫秒后投递到对应的 Queue。

约束与限制

Serverless 集群支持延时消息,且不需要依赖打开开源延时消息插件。
延时消息的时间值必须为非负整数,单位为毫秒。
如果您的延时消息设置了消息存活时间,则 延时消息的实际存活时间 = min{消息存活时间, Queue存活时间} + 延时时间 Serverless版集群的最大延时时间限制,请见使用限制。具体限制场景如下:
消息只设置了延时时间,没有设置存活时间,且延时时间 > 集群最大延时时间限制,则消息会被当作普通消息处理,即消息生产后会被立即投递给消费者消费。
消息设置了存活时间,且延时消息的实际存活时间 > 集群最大延时时间限制,则客户端会收到连接断开的报错,消息会生产失败。

使用方式

步骤
生产消息时,在消息 header 属性中增加 X-delay 键,值为消息的延时时间(单位:毫秒)。
示例
byte[] messageBodyBytes = "delayed message".getBytes("UTF-8"); //定义消息主体
Map<String, Object> headers = new HashMap<String, Object>(); //创建Map集合,存储消息的自定义Header信息
headers.put("x-delay", 1000); //向Header添加延时消息键值对,指定消息的延时时间为1000毫秒。
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); //将Header信息设置到消息属性中
channel.basicPublish("ExchangeName", "Routing Key", props.build(), messageBodyBytes); //把消息发送到指定的Exchange


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈