tencent cloud

TDMQ for RocketMQ

Release Notes and Announcements
Release Notes
Announcements
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for RocketMQ
Strengths
Scenarios
Product Series
Comparison with Open-Source RocketMQ
High Availability
Quotas and Limits
Supported Regions
Basic Concepts
Billing
Billing Overview
Pricing
Billing Examples
Pay-as-you-go Switch to Monthly Subscription (5.x)
Renewal
Viewing Consumption Details
Refund
Overdue Payments
Getting Started
Getting Started Guide
Preparations
Step 1: Creating TDMQ for RocketMQ Resources
Step 2: Using the SDK to Send and Receive Messages (Recommended)
Step 2: Running the TDMQ for RocketMQ Client (Optional)
Step 3: Querying Messages
Step 4: Deleting Resources
User Guide
Usage Process Guide
Configuring Account Permissions
Creating the Cluster
Configuring the Namespace
Configuring the Topic
Configuring the Group
Connecting to the Cluster
Managing Messages
Managing the Cluster
Viewing Monitoring Data and Configuring Alarms
Cross-Cluster Message Replication
Use Cases
Naming Conventions for Common Concepts of TDMQ for RocketMQ
RocketMQ Client Use Cases
RocketMQ Performance Load Testing and Capacity Assessment
Access over HTTP
Client Risk Descriptions and Update Guide
Migration Guide for TencentCloud API Operations Related to RocketMQ 4.x Cluster Roles
Migration Guide
Disruptive Migration
Seamless Migration
Developer Guide
Message Types
Message Filtering
Message Retries
POP Consumption Mode (5.x)
Clustering Consumption and Broadcasting Consumption
Subscription Relationship Consistency
Traffic Throttling
​​API Reference(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
​​API Reference(4.x)
SDK Reference
SDK Overview
5.x SDK
4.x SDK
Security and Compliance
Permission Management
CloudAudit
Deletion Protection
FAQs
4.x Instance FAQs
Agreements
TDMQ for RocketMQ Service Level Agreement
Contact Us

POP Consumption Mode (5.x)

PDF
Focus Mode
Font Size
Last updated: 2026-01-23 17:52:23

Issue Background

RocketMQ is well-known to many customers and developers for its high performance, low latency, and backlog resistance. However, when using the RocketMQ 4.x client SDK, certain customers report that the consumer client encounters issues during actual message consumption with the 4.x clients (such as the commonly used Push Consumer):
The SDK undertakes too many features, such as pulling messages, load balancing, message queue offset management, and rebalance when new clients are added. This is not developer-friendly for those working with multiple programming languages.
The queue-exclusive load balancing policy can easily lead to consumption bottlenecks. Each queue on the broker can only be assigned to one consumer client within the same consumer group. Therefore, when the number of queues is fixed, simply increasing the number of consumer clients does not improve consumption performance. For example, if a topic has 10 queues, the group can have at most 10 clients consuming messages (at most one queue per client). During peak business hours, even if a customer wants to add a new client to consume messages, the 11th client that comes online will be unable to consume any messages.
A single client failure can cause message backlog. If a single client hangs due to an exception but its heartbeat with the server remains connected, the server will still assign queues to this client for consumption. Since the client is actually unable to process messages due to the exception, a backlog starts to accumulate. Furthermore, as explained in the previous point, merely adding more clients cannot resolve this issue.

Solutions

Given the reasons above, the POP consumption mode was introduced in the 5.x version.
In POP mode, the consumer offset is managed by the server, allowing multiple clients to consume from the same queue. Clients using the POP mode pull messages from all queues, thereby solving the issues of single-client failure and consumption bottlenecks mentioned above.
Furthermore, as the server maintains the consumption information, the client SDK becomes more lightweight, facilitating easier multi-language porting.


Sample Code

So, how do we use the POP consumption mode?
The 5.x gRPC SDK is required. Include the following dependencies:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
</dependencies>
Also, refer to the following demo from the open-source community (using Java code as an example):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.java.example;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerExample {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);

private SimpleConsumerExample() {
}

@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
String accessKey = "User AccessKey";
String secretKey = "User SecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

String endpoints = "Tencent Cloud page access point";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not essential.
// .enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "Consumer group";
// The default consumption timeout is 30 seconds. This means if a pulled message is not consumed within 30 seconds, it can be pulled again by another client.
// Configure this based on your actual scenario.
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "*";
String topic = "Topic name";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
// Receive message, multi-threading is more recommended.
do {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
// Close the simple consumer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}
In this scenario, a consumer within the same consumer group is no longer exclusively bound to specific queues. This also largely prevents the queue backlog issue seen in 4.x, which was caused by the blocking of a single consumer.

Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback