tencent cloud

Stream Compute Service

Releases Notes and Announcements
Release Notes
Product Introduction
Overview
Strengths
Use Cases
Purchase Guide
Billing Overview
Billing Mode
Refund
Configuration Adjustments
Getting Started
Preparations
Creating a Private Cluster
Creating a SQL Job
Creating a JAR Job
Creating an ETL Job
Creating a Python Job
Operation Guide
Managing Jobs
Developing Jobs
Monitoring Jobs
Job Logs
Events and Diagnosis
Managing Metadata
Managing Checkpoints
Tuning Jobs
Managing Dependencies
Managing Clusters
Managing Permissions
SQL Developer Guide
Overview
Glossary and Data Types
DDL Statements
DML Statements
Merging MySQL CDC Sources
Connectors
SET Statement
Operators and Built-in Functions
Identifiers and Reserved Words
Python Developer Guide
ETL Developer Guide
Overview
Glossary
Connectors
FAQ
Contact Us

TDMQ for RabbitMQ

PDF
Modo Foco
Tamanho da Fonte
Última atualização: 2023-11-08 14:55:24

Overview

TDMQ for RabbitMQ (RMQ) is Tencent's proprietary message queue service. It supports the AMQP 0-9-1 protocol and is compatible with all components of Apache RabbitMQ. It can be used as a sink. You can output stream data processed by Flink operators to an RMQ queue.

Versions

Flink Version‌
Description
1.11
Supported
1.13
Supported
1.14
Unsupported
1.16
Unsupported

Limits

RMQ can be used as a sink. It does not support upsert streams.

Defining a table in DDL

As a sink

JSON format

CREATE TABLE `rmq_sink_json_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.
'host' = 'xxxx', -- The RMQ host.
'port' = 'xxxx', -- The RMQ port.
'vhost' = '/', -- The virtual host.
'username' = 'xxxx', -- The username.
'password' = 'xxxx', -- The password.
'exchange' = 'exchange', -- The exchange name.
'routing-key' = 'Key', -- The key bound.
'format' = 'json', -- The data format (JSON).
'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
);

CSV format

CREATE TABLE `rmq_sink_csv_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.
'host' = 'xxxx', -- The RMQ host.
'port' = 'xxxx', -- The RMQ port.
'vhost' = '/', -- The virtual host.
'username' = 'xxxx', -- The username.
'password' = 'xxxx', -- The password.
'exchange' = 'exchange', -- The exchange name.
'routing-key' = 'Key', -- The key bound.
'format' = 'csv', -- The data format (CSV).
);

WITH parameters

Common WITH parameters

Option
Required
Default Value
Description
connector
Yes
-
Here, it should be 'rabbitmq'.
host
Yes
-
The host of the queue.
port
Yes
5672
The RMQ port.
vhost
Yes
/
The virtual host.
name-server
Yes
guest
The role name.
password
Yes
guest
The password of the role.
queue
No
-
The queue name.
exchange
Yes
-
The exchange name.
routing-key
No
-
The key bound by the exchange.
delivery-mode
No
1
Whether messages will be persistent. 1: No; 2: Yes.
expiration
No
86400000
The message validity period (milliseconds), which is one day by default.
network-recovery-interval
No
30s
The network recovery interval.
automatic-recovery
No
true
Whether to connect to RMQ automatically. Automatic connection is enabled by default.
topology-recovery
No
true
Whether to recover RMQ topology automatically. Automatic topology recovery is enabled by default.
connection-timeout
No
30s
The connection timeout period, which is 30 seconds by default.
requested-frame-max
No
0
The maximum frame size (bytes) for the first request. If this is 0, no limit will be set.
requested-heartbeat
No
60s
The heartbeat timeout period.
prefetch-count
No
0
The maximum number of messages the server can send. If this is 0, no limit will be set. This option is only supported by Flink 1.13.
delivery-timeout
No
30s
The commit timeout period. This option is only supported by Flink 1.13.
format
Yes
-
The input and output format of RMQ messages. Valid values include 'csv' and 'json'.

WITH parameters for JSON

Option
Required
Default Value
Description
json.fail-on-missing-field
No
false
If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed.
json.ignore-parse-errors
No
false
If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error.
json.timestamp-format.standard
No
SQL
The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

WITH parameters for CSV

Option
Required
Default Value
Description
csv.field-delimiter
No
,
The field delimiter, which is comma by default.
csv.line-delimiter
No
U&'\\000A'
The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A'). You can also set it to \\r (in SQL, you need to use U&'\\000D'). This option is only supported by Flink 1.11.
csv.disable-quote-character
No
false
Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used.
csv.quote-character
No
''
The quote characters. Text inside quotes will be viewed as a whole. The default value is ''.
csv.ignore-parse-errors
No
false
Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
csv.allow-comments
No
false
Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well).
csv.array-element-delimiter
No
;
The array element delimiter, which is ; by default.
csv.escape-character
No
-
The escape character. By default, escape characters are disabled.
csv.null-literal
No
-
The string that will be seen as null.

Example

-- Please replace the parameter values below with the corresponding values for the cluster used.
CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'source_table_name',
'username' = 'username',
'password' = 'password'
);

CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH (
'connector' = 'rabbitmq',
'host' = 'host',
'port' = 'port',
'vhost' = 'vhost',
'username' = 'username',
'password' = 'password',
'queue' = 'queue-name',
'exchange'='exchange',
'routing-key'='key',
'format' = 'json'
);
insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
Note
When RMQ is used as a sink, there is a small possibility of duplication.

Ajuda e Suporte

Esta página foi útil?

comentários