tencent cloud

TDMQ for MQTT

Release Notes and Announcements
Release Notes
Product Introduction
TDMQ Product Series Introduction and Selection
What Is TDMQ for MQTT
Scenarios
Technical Architecture
Product series
MQTT Protocol Compatibility Notes
Comparison with Apache
High Availability
Product Constraints and Usage Quota
Basic Concepts
Supported Regions
Billing
Billing Overview
Renewal Instructions
Viewing Consumption Details
Overdue Payment Instructions
Refund
Getting Started
Guide for Getting Started
Preparations
Public Network Access
VPC Network Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating a Cluster
Managing Topic
Connecting to the Cluster
Querying Messages
Managing Client
Managing a Cluster
Viewing Monitoring Metrics and Configuring Alarm Policies
Data Integration
Integrating Data Into SCF
Integrating Data Into CKafka
Integrating Data into RocketMQ
Development Guide
MQTT 5 Advanced Features
Data Plane HTTP API Description
Quota and Flow Control Mechanism Description
Configuring a Custom Domain Name
Configuring SQL Filtering
Configuring Point-to-Point Subscription
MQTT over QUIC
Managing Client Subscription
Message Enhancement Rule
Use Cases
Must-Knows for MQTT Client Development
Observability
Topic and Wildcard Subscriptions
​​API Reference
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK Reference
Access Point Format
Java SDK
C SDK
Javascript/Node.JS/Mini Program
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
Security and Compliance
Permission Management
FAQs
Related Agreement
Privacy Policy
Data Privacy And Security Agreement
TDMQ for MQTT Service Level Agreement
Contact Us

Configuring SQL Filtering

PDF
Focus Mode
Font Size
Last updated: 2026-04-01 16:37:51

Background

The MQTT standard specification defines the concept of Topic Filter, allowing subscribers to select messages to receive based on the hierarchical structure and wildcards of MQTT Topic Name. While topics and wildcards provide robust filtering capabilities, in scenarios such as grayscale releases, A/B testing, and system upgrades, relying solely on Topic Filter still cannot meet more flexible business requirements.

Implementation Principles

The MQTT 5.0 protocol introduces the Subscribe User Property mechanism. Based on this mechanism, this product extends support for filtering semantics of subscribe user properties, enabling finer-grained message filtering capabilities. During message subscription, if subscribe user properties contain a property in which the key is $where and the value is a valid WHERE clause, the MQTT Server will filter messages according to this WHERE clause during message delivery, and push only messages that meet conditions to subscribers.




Basic Workflow

1. Subscription and Declaration: The subscriber initiates a subscribe request and declares filtering conditions ($where) in user properties.
2. Condition Parsing: The server parses and validates the effectiveness of the WHERE clause.
3. Message Matching: When messages are published, the server applies all filtering conditions of the subscriber to messages that match the topic.
4. Precise Delivery: Only the message that meet the conditions is delivered to the subscriber.

SQL Filtering Syntax

WHERE clauses support extensive operators and functions for constructing flexible filtering conditions.

Supported Operators

Type
Operator
Example
Description
Comparison Operator
=, !=, >, >=, <, <=
payload.temp > 30
Compare numerical values or strings
Logical Operator
AND, OR, NOT
temp > 25 AND hum < 70
Combine multiple conditions
Range Judgment
IN
clientid IN ('client1', 'client2')
Determine whether a field value is in a list
Null Check
IS NULL
payload.location IS NULL
Determine whether a field is null
Pattern Matching
LIKE
topic LIKE 'sensor/%/temp'
Perform simple wildcard matching
Conditional Expression
CASE WHEN...THEN...ELSE...END
CASE WHEN qos > 0 THEN 'important' ELSE 'normal' END
Implement conditional logic

Supported Functions

Type
Function Example
Description
String Function
UPPER(), LOWER(), LENGTH()
Process text data
Mathematical Function
ABS()
Calculate the absolute value
Conditional Function
COALESCE()
Return the first non-null value among the parameters

Must-Knows

1. User properties in each subscribe request can contain only one property with the key $where. If multiple key-value pairs of $where→WHERE clause exist, only the first one takes effect.
2. For user properties in a message, if multiple key-value pairs with the same key exist, only the value from the last occurrence is used in the filter expression calculation.
3. If a field referenced in the filter expression does not exist in the message properties, its value is considered null.
4. String literals should be represented using single quotes, for example: WHERE type = 'string-literal'.

Examples

package com.tencent.tdmq.mqtt.quickstart.paho.v5.async;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class BasicQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://mqtt-xxx.mqtt.tdmqcloud.com:1883";
String clientId = "deviceBasic";

String topic = "home/room/1";
String[] topicFilters = new String[] {"home/#"};
int[] qos = new int[] {1};

MqttAsyncClient client = new MqttAsyncClient(serverUri, clientId, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(1));

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s], properties=%s%n",
topic, message.getQos(), content, message.getProperties());
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});
client.connect(options).waitForCompletion();
try {
// Subscribe
MqttSubscription[] subscriptions = new MqttSubscription[topicFilters.length];
for (int i = 0; i < topicFilters.length; i++) {
subscriptions[i] = new MqttSubscription(topicFilters[i], qos[i]);
}
MqttProperties subscribeProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
UserProperty userProperty = new UserProperty("$where", "where $QoS = 1 AND k1 = 'v1'");
userProperties.add(userProperty);
subscribeProperties.setUserProperties(userProperties);
client.subscribe(subscriptions, null, null, subscribeProperties).waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}


int total = 128;
for (int i = 0; i < total; i++) {
byte[] payload = new byte[4];
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.putInt(i);
MqttMessage message = new MqttMessage(payload);
message.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setContentType("application/json");
properties.setResponseTopic("response/topic");
message.setProperties(properties);
System.out.printf("Prepare to publish message %d%n", i);
// P2P topic format: {first-topic}/p2p/{target-client-id}
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.MILLISECONDS.sleep(100);
}
TimeUnit.MINUTES.sleep(3);
client.disconnect();
}
}


Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback