property 属性中增加 __STARTDELIVERTIME 属性值,就能在一定范围内(40天)实现该消息在任意时间的定时发送。延时消息则可以先通过计算得到定时发送的时间点,再以定时消息的形式发送。System.currentTimeMillis() + delayTime 计算得到定时发送的时间点,再以定时消息的方式发送。Message msg = new Message("test-topic", ("message content").getBytes(StandardCharsets.UTF_8));// 设定消息在10秒之后被发送long delayTime = System.currentTimeMillis() + 10000;// 将 __STARTDELIVERTIME 设定到 msg 的属性中msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));SendResult result = producer.send(msg);System.out.println("Send delay message: " + result);
Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey-3ee439f945d7") // Set expected delivery timestamp of message. 设置延迟消息的时间 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build();
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

// 在Producer端设置消息为延时消息Message msg = new Message();msg.setTopic("TopicA");msg.setTags("Tag");msg.setBody("this is a delay message".getBytes());// 设置延迟level为5,对应延迟1分钟msg.setDelayTimeLevel(5);producer.send(msg);

/*** Represents a slot of timing wheel. Format:* ┌────────────┬───────────┬───────────┬───────────┬───────────┐* │delayed time│ first pos │ last pos │ num │ magic │* ├────────────┼───────────┼───────────┼───────────┼───────────┤* │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │* └────────────┴───────────┴───────────┴───────────┴───────────┘*/
public final static int UNIT_SIZE = 4 //size+ 8 //prev pos+ 4 //magic value+ 8 //curr write time, for trace+ 4 //delayed time, for check+ 8 //offsetPy+ 4 //sizePy+ 4 //hash code of real topic+ 8; //reserved value, just in case of

TimerEnqueueGetService 从消息文件读取得到定时主题的消息,并先将其放入 EnqueuePutQueue。TimerEnqueuePutService 将其执行 Timerlog-unit 构建逻辑并放入 TimerLog,更新时间轮(Timewheel)的存储内容。TimerDequeueGetService 每 50ms 读一次下一秒的 Slot,从 TimerLog 中得到指定的数据,并放进 dequeueGetQueue。TimerDequeueGetMessageService 从 dequeueGetQueue 中取出数据并根据索引信息,从消息文件中查出对应的 msgs,并将其放入待写入消息文件的队列中(dequeuePutQueue)。TimerDequeuePutMessageService 将这个 Putqueue 中的消息取出,若已到期则修改 Topic,放回 Commitlog(投递到真正的 Topic),否则继续按指定主题(TIMER_TOPIC)写回 CommitLog 滚动(避免消息过期)。Message message = new Message(TOPIC, ("Hello" + i).getBytes(StandardCharsets.UTF_8));// 延迟 10s 后投递message.setDelayTimeSec(10);// 延迟 10000ms 后投递,投递到服务端后计算定时时间,即投递到服务端的时间+delayTimemessage.setDelayTimeMs(10_000L);// 定时投递,定时时间为当前时间 + 10000msmessage.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);// 发送消息SendResult result = producer.send(message);

文档反馈