Implementation Principles
The CKafka connector has a built-in MQTT source plugin that leverages the MQTT shared subscription mechanism to ingest MQTT messages in real time and forward them to CKafka clusters. This shared subscription mode supports high-concurrency configuration, effectively ensuring data transmission throughput and fully meeting the requirements for high-traffic access and processing capabilities when Kafka is integrated with the big data ecosystem.
Data Mapping
When MQTT messages are converted to Kafka records, the mapping relationship is as follows:
MQTT Message
System Fields
|
Packet ID | Control command ID, not unique, for quick reuse. See Spec 2.2.1 |
Duplicated | |
QoS | |
Retained | |
Message ID | Extended field, unique message number |
Message Timestamp | Extended field, server-side message storage time |
Publisher Client ID | Extended field, identifier of the client publishing messages |
Publisher Client Host | Extended field, IP of the client publishing messages |
Publisher Username | Extended field, username of the client publishing messages |
User Properties
The list of key-value pairs specified by the user. See Spec 3.3.2.3.7. Kafka Record
|
Key | Key-value of the record, optional |
Headers | Key-value pairs associated with the record, often used to store metadata such as Content Type, event time, optional |
Payload | Actual data of the record, message body |
Headers Usage Scenarios
Message route
Metadata storage description
Tracing and logs
Customized service handling
Security Authentication
Message priority
Interoperability/compatibility command
Stream processing
Scenarios
Smart City and Transportation Digital Twin
The system collects multi-source traffic data (such as license plate numbers, speeds, and travel trajectories) from cities in real time, reports them via MQTT topics, and integrates them into the big data ecosystem through Kafka connectors.
It supports efficient search and analysis based on attributes such as license plate numbers (such as vehicle trajectory restoration), providing data support for traffic monitoring, dispatching, and simulation.
Features and Advantages
TDMQ for CKafka is a distributed message system with high throughput and high scalability. However, it is not inherently designed for edge IoT communication scenarios, as its clients typically require stable network environments and substantial hardware resources. In contrast, the massive volumes of data generated by devices and applications in the IoT domain are often transmitted via the lightweight MQTT protocol. The CKafka MQTT connector enables seamless integration between the MQTT protocol and the CKafka ecosystem, allowing MQTT messages published by devices to be streamed in real time into CKafka topics. This ensures data can be processed, stored, and analyzed in real time. Such integration preserves MQTT's advantages in unstable networks and low-resource environments while fully utilizing CKafka's capabilities in high throughput, reliability, and ecosystem compatibility. This achieves flexible, stable, and efficient integration between IoT data and big data systems.
Operation Steps
Policy and Permissions
1. Log in to the TDMQ for MQTT console, go to the Cluster Details page, and confirm whether the authorization policy management is enabled for the current MQTT cluster. 1.1 If the permission policy is not enabled, the data plane resources have no permission management. You can use any "username + password" to perform operations such as connection, production, and consumption. For details, see Configure Data Plane Authorization. In this case, no additional configuration is required when data is integrated into CKafka. However, due to the lack of permission control, there are certain data security risks. 1.2 If the permission policy is enabled, follow the steps below.
2. Go to Authentication > Username and Password, click Create User to create a dedicated account and password for the Data Integration task, with the username ckafka_connector. Specify in the description that this user is exclusively used for MQTT and CKafka data integration tasks, as shown in the figure.
3. Go to the Authorization Policy page, click Create Policy. It is strongly recommended to explicitly authorize the CKafka Data Integration account created in the previous step within this policy to achieve granular permission control. For specific configuration methods, refer to the figure below. Fill in other fields according to actual requirements. For details, refer to Configuring Data Plane Authorization. Configuring a CKafka Connector
1. Log in to the TDMQ for CKafka console, go to the Connection List page, and first confirm the region to which connections belong at the top of the page. 2. Click Create Connection to create the connector.
3. Follow the steps below to select connection information. Select MQTT Cluster as the connection type, then click Next to go to the Connection Configuration page.
4. Enter the connection name and description, and select the target MQTT cluster from the dropdown. The username and password here are used for connection authentication, which are the dedicated data integration account credentials created in the MQTT cluster. For details, see Policy and Permissions. Click Next to go to the Connection Validation page. 5. After all validations pass, the connection is successfully created. You can view the newly added connection in TDMQ for CKafka console > Connectors > Connection List.
For a created connection, the connection list displays its basic information, including ID, Name, Status, Connection Type, Bound Resources, Resource Region, Number of Associated Tasks, Creation Time, and Description.
Click Edit in the Actions column to modify connection configurations. After a connection is updated, the system enables the "Update and restart all associated tasks" option by default. Exercise caution based on actual business requirements during the operation.
Click Delete in the Actions column to delete this connection.
Creating a Data Integration Task
Prerequisites
Task Creation
1. Choose TDMQ for CKafka console > Connectors > Task List, click Create Task in the upper-left corner to fill in the task-related information. Select Data Integration > MQTT Cluster for the task type, then click Next to go to the Data Source Configuration page. 2. Select an appropriate connection from the dropdown. If no suitable option is available, click the button below to go to the Create Connection step. Enter the subscribed topic. For multiple subscribed topics, separate them with ",".
3. Configure the data target, to be specific, specify the distribution policy and target CKafka instance. Click Submit to complete task creation.
4. When the task is successfully created, a shared subscription group will be automatically created under the MQTT cluster for data integration.
You can also go to the Client page to view details of the connector client for this task.