Release Notes
Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Unsupported |
CREATE TABLE `rmq_sink_json_table` (`id` int,`name` STRING) WITH ('connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.'host' = 'xxxx', -- The RMQ host.'port' = 'xxxx', -- The RMQ port.'vhost' = '/', -- The virtual host.'username' = 'xxxx', -- The username.'password' = 'xxxx', -- The password.'exchange' = 'exchange', -- The exchange name.'routing-key' = 'Key', -- The key bound.'format' = 'json', -- The data format (JSON).'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);
CREATE TABLE `rmq_sink_csv_table` (`id` int,`name` STRING) WITH ('connector' = 'rabbitmq', -- Here, it should be 'rabbitmq'.'host' = 'xxxx', -- The RMQ host.'port' = 'xxxx', -- The RMQ port.'vhost' = '/', -- The virtual host.'username' = 'xxxx', -- The username.'password' = 'xxxx', -- The password.'exchange' = 'exchange', -- The exchange name.'routing-key' = 'Key', -- The key bound.'format' = 'csv', -- The data format (CSV).);
Option | Required | Default Value | Description |
connector | Yes | - | Here, it should be 'rabbitmq'. |
host | Yes | - | The host of the queue. |
port | Yes | 5672 | The RMQ port. |
vhost | Yes | / | The virtual host. |
name-server | Yes | guest | The role name. |
password | Yes | guest | The password of the role. |
queue | No | - | The queue name. |
exchange | Yes | - | The exchange name. |
routing-key | No | - | The key bound by the exchange. |
delivery-mode | No | 1 | Whether messages will be persistent. 1: No; 2: Yes. |
expiration | No | 86400000 | The message validity period (milliseconds), which is one day by default. |
network-recovery-interval | No | 30s | The network recovery interval. |
automatic-recovery | No | true | Whether to connect to RMQ automatically. Automatic connection is enabled by default. |
topology-recovery | No | true | Whether to recover RMQ topology automatically. Automatic topology recovery is enabled by default. |
connection-timeout | No | 30s | The connection timeout period, which is 30 seconds by default. |
requested-frame-max | No | 0 | The maximum frame size (bytes) for the first request. If this is 0, no limit will be set. |
requested-heartbeat | No | 60s | The heartbeat timeout period. |
prefetch-count | No | 0 | The maximum number of messages the server can send. If this is 0, no limit will be set. This option is only supported by Flink 1.13. |
delivery-timeout | No | 30s | The commit timeout period. This option is only supported by Flink 1.13. |
format | Yes | - | The input and output format of RMQ messages. Valid values include 'csv' and 'json'. |
Option | Required | Default Value | Description |
json.fail-on-missing-field | No | false | If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed. |
json.ignore-parse-errors | No | false | If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error. |
json.timestamp-format.standard | No | SQL | The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}. |
Option | Required | Default Value | Description |
csv.field-delimiter | No | , | The field delimiter, which is comma by default. |
csv.line-delimiter | No | U&'\\000A' | The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A'). You can also set it to \\r (in SQL, you need to use U&'\\000D'). This option is only supported by Flink 1.11. |
csv.disable-quote-character | No | false | Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used. |
csv.quote-character | No | '' | The quote characters. Text inside quotes will be viewed as a whole. The default value is ''. |
csv.ignore-parse-errors | No | false | Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure. |
csv.allow-comments | No | false | Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well). |
csv.array-element-delimiter | No | ; | The array element delimiter, which is ; by default. |
csv.escape-character | No | - | The escape character. By default, escape characters are disabled. |
csv.null-literal | No | - | The string that will be seen as null. |
-- Please replace the parameter values below with the corresponding values for the cluster used.CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai','table-name' = 'source_table_name','username' = 'username','password' = 'password');CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH ('connector' = 'rabbitmq','host' = 'host','port' = 'port','vhost' = 'vhost','username' = 'username','password' = 'password','queue' = 'queue-name','exchange'='exchange','routing-key'='key','format' = 'json');insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
Apakah halaman ini membantu?
Anda juga dapat Menghubungi Penjualan atau Mengirimkan Tiket untuk meminta bantuan.
masukan