Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Supported, based on community release 1.1.1. |
1.16 | Supported, based on community release 24.0.1. |
1.18 | Supported, based on community release 24.0.1. |
enable_http_server_v2 = true.CREATE TABLE doris_sink_table (id INT,name VARCHAR) WITH ('connector' = 'doris', -- Fixed value 'doris'.'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.'table.identifier' = 'test.sales_order', -- Doris table name. Format: db.tbl.username' = 'root', -- Username for accessing Doris, with write permission to the database.password' = 'password', -- Password for accessing Doris.'sink.batch.size' = '500', -- Maximum number of rows per BE write.'sink.batch.interval' = '1s' -- Flush interval. After this time, the asynchronous thread will write data in the cache to the backend. The default value is 1s, and supported time units include ms, s, min, h, and d. Setting to 0 means disabling periodic writes.);
CREATE TABLE doris_sink_table (id INT,name VARCHAR) WITH ('connector' = 'doris', -- Fixed value 'doris'.'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.'table.identifier' = 'test.sales_order', -- Doris table name. Format: db.tbl.username' = 'root', -- Username for accessing Doris, with write permission to the database.password' = 'password' -- Password for accessing Doris.);-- Note: 2PC is enabled by default.
CREATE CATALOG doris_catalog WITH ('type' = 'doris','fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.username' = 'root', -- Username for accessing Doris, with write permission to the database.password' = 'password', -- Password for accessing Doris.'default-database' = 'default')
Parameter | Description | Required | Remarks |
connector | Source table type. | Yes | Fixed value: doris. |
fenodes | Doris FE HTTP Address | Yes | - |
table.identifier | Doris table name. Format: db1.tbl1. | Yes | - |
username | Username for accessing Doris. | Yes | - |
password | Password for accessing Doris. | Yes | - |
sink.batch.size | Maximum number of rows per backend write. | No | Default value: 100. |
sink.max-retries | Number of retry attempts after BE write failure. | No | Default value: 1. |
sink.batch.interval | Flush interval. After this time, the asynchronous threads will write data in the cache to the backend. The default value is 1s, and the supported time units include ms, s, min, h, and d. Setting to 0 means disabling periodic writes. | No | Default value: 1s. |
sink.properties.* | No | - | |
sink.enable-2pc | Specifies whether to use transactional writes. | No | false |
Parameter | Description | Required | Remarks |
type | - | Yes | Fixed value: doris. |
fenodes | Doris FE HTTP Address | Yes | - |
username | Username for accessing Doris. | Yes | - |
password | Password for accessing Doris. | Yes | - |
default-database | Default database. | Yes | - |
Parameter | Description | Required | Default value. |
connector | Source table type. | Yes | Fixed value: doris. |
fenodes | Doris FE HTTP address. | Yes | - |
table.identifier | Doris table name. Format: db1.tbl1. | Yes | - |
username | Username for accessing Doris. | Yes | - |
password | Password for accessing Doris. | Yes | - |
sink.properties.* | The import parameters of the stream load For example, 'sink.properties.column_separator' = ', ' defines the column separators, 'sink.properties.escape_delimiters' = 'true' uses special characters as separators, and '\\x01' will be converted to binary 0x01. Importing in JSON Format 'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true'. For detailed parameters, please refer to Stream Load > Load Configuration Parameters. Group Commit mode For example, 'sink.properties.group_commit' = 'sync_mode' sets Group Commit to sync mode. The Flink connector supports import configuration Group Commit from version 1.6.2. For detailed usage and limitations, please refer to High Concurrency LOAD Optimization(Group Commit). | No | - |
sink.enable-2pc | Specifies whether to enable 2PC. It is set to true by default, ensuring Exactly-Once semantics. | No | TRUE |
sink.buffer-size | The size of the write data cache buffer (in bytes). We recommend that you use the default configuration instead of modification. | No | 1MB |
sink.buffer-count | The number of write data cache buffers. We recommend that you use the default configuration instead of modification. | No | 3 |
sink.max-retries | Maximum retry attempts after a Commit failure. Default attempts: 3. | No | 3 |
sink.use-cache | Specifies whether to use memory cache for recovery in case of an exception. Once it is enabled, the data during the Checkpoint period will be retained in the cache. | No | false |
sink.enable.batch-mode | Specifies whether to use batching mode to write to Doris. When it is enabled, the write timing is independent of Checkpoint and controlled by the sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval parameter. Enabling both options does not guarantee Exactly-Once semantics. Idempotency can be achieved with the Uniq model. | No | false |
sink.flush.queue-size | The cache queue size in batching mode. | No | 2 |
sink.buffer-flush.max-rows | Maximum rows of data per single batch in batching mode. | No | 500000 |
sink.buffer-flush.max-bytes | The maximum number of bytes written to a single batch in batching mode. | No | 100MB |
sink.buffer-flush.interval | The interval of the asynchronous cache refresh in batching mode. | No | 10s |
sink.ignore.update-before | Specifies whether to ignore the update-before event. Ignore by default. | No | true |
Doris Field Type | Flink Field Type |
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
| TIME |
DATE | DATE |
DATETIME | TIMESTAMP |
CHAR | STRING |
| LARGEINT |
| VARCHAR |
DECIMAL | DECIMAL |
| DECIMALV2 |
HLL | Unsupported datatype |
----- Flink 1.13 -----CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- Number of data rows generated per second.);CREATE TABLE doris_sink_table (id INT,name STRING) WITH ('connector' = 'doris', -- Fixed value 'doris'.'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.'table.identifier' = 'test.sales_order', -- Doris table name. Format: db.tbl.username' = 'root', -- Username for accessing Doris, with write permission to the database.password' = 'password', -- Password for accessing Doris.'sink.batch.size' = '500', -- Maximum number of rows per BE write.sink.batch.interval' = '1s' -- Flush interval. After this time, the asynchronous thread will write data in the cache to the backend. The default value is 1s, and the supported time units include ms, s, min, h, and d. Setting to 0 means disabling periodic writes.);INSERT INTO doris_sink_table select * from datagen_source_table;
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- Number of data rows generated per second.);CREATE TABLE doris_sink_table (id INT,name STRING) WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label');INSERT INTO doris_sink_table select * from datagen_source_table;
CREATE CATALOG doris_catalog WITH ('fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.username' = 'root', -- Username for accessing Doris, with write permission to the database.password' = 'password', -- Password for accessing Doris.'default-database' = 'default');CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- Number of data rows generated per second.);INSERT INTOdoris_catalog.my_database.my_tableSELECT * FROM.datagen_source_table;
--mysql cdc source table.CREATE TABLEmysql_cdc_source_table(idINT NOT NULL,nameVARCHAR,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc' -- Fixed value 'mysql-cdc'.'hostname' = 'YourHostName', -- IP address of the database.port' = '3306', -- Port for database access.'username' = 'YourUserName', -- Username for database access (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).password' = 'YourPassword', -- Password for database access.'database-name' = 'YourDatabase', -- Database to be synchronized.'table-name' = 'YourTable' -- Name of the data table to be synchronized.);-- Write to Doris table.CREATE TABLEprint_table(idINT,nameSTRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'doris', -- Fixed value 'doris'.'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.'table.identifier' = 'dbName.tableName', -- Doris table name. Format: db.tbl.username' = 'YourUserName', -- Username for accessing Doris, with write permission to the database.password' = 'YourPassword', -- Password for accessing Doris.);insert into print_tableselect id,name from mysql_cdc_source_table;
-- Table creation statement for the Uniq model.CREATE TABLEdoris_sink_table(idint(11),namevarchar(1024))UNIQUE KEY(id)DISTRIBUTED BY HASH(id) BUCKETS 10PROPERTIES("replication_num" = "3");-- Table creation statement for the Aggregate model.CREATE TABLEdoris_sink_table(idint(11),namevarchar(1024) REPLACE DEFAULT '0')AGGREGATE KEY('id')DISTRIBUTED BY HASH(id) BUCKETS 10PROPERTIES("replication_num" = "3"); -- Note that if the backend nodes are insufficient, the errorFailed to find enough host in all backendswill occur. You can reduce this value as needed.
CREATE USER 'test' IDENTIFIED BY 'test_passwd';GRANT ALL ON test TO test;
피드백