tencent cloud

Stream Compute Service

Doris

Download
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-05-19 17:16:54

Introduction

Flink Doris Connector currently supports writing data to Doris via Flink, implemented based on the open-source edition.

Version Description

Flink Version
Description
1.11
Supported
1.13
Supported
1.14
Supported, based on community release 1.1.1.
1.16
Supported, based on community release 24.0.1.
1.18
Supported, based on community release 24.0.1.
Note:
Flink version 1.14 or later (including 1.14) has two-phase commit (2PC) enabled by default.

Applicable Scope

Flink Doris Connector currently only supports Doris sink. The supported Doris version is 0.14.0 or later, and requires enabling the configuration enable_http_server_v2 = true.

DDL Definition

Note:
Flink 1.13 has different DDL parameters from other Flink versions. Please select the corresponding version to use.

Serving as a Data Sink (Sink) (Flink 1.13)

CREATE TABLE doris_sink_table (
id INT,
name VARCHAR
) WITH (
'connector' = 'doris', -- Fixed value 'doris'.
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.
'table.identifier' = 'test.sales_order', -- Doris table name. Format: db.tbl.
username' = 'root', -- Username for accessing Doris, with write permission to the database.
password' = 'password', -- Password for accessing Doris.
'sink.batch.size' = '500', -- Maximum number of rows per BE write.
'sink.batch.interval' = '1s' -- Flush interval. After this time, the asynchronous thread will write data in the cache to the backend. The default value is 1s, and supported time units include ms, s, min, h, and d. Setting to 0 means disabling periodic writes.
);

Serving as a Data Sink (Sink)

CREATE TABLE doris_sink_table (
id INT,
name VARCHAR
) WITH (
'connector' = 'doris', -- Fixed value 'doris'.
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.
'table.identifier' = 'test.sales_order', -- Doris table name. Format: db.tbl.
username' = 'root', -- Username for accessing Doris, with write permission to the database.
password' = 'password' -- Password for accessing Doris.
);

-- Note: 2PC is enabled by default.

Serving as Catalogs

CREATE CATALOG doris_catalog WITH (
'type' = 'doris',
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.
username' = 'root', -- Username for accessing Doris, with write permission to the database.
password' = 'password', -- Password for accessing Doris.
'default-database' = 'default'
)

WITH Parameters (Flink Version 1.13)

Sink

Parameter
Description
Required
Remarks
connector
Source table type.
Yes
Fixed value: doris.
fenodes
Doris FE HTTP Address
Yes
-
table.identifier
Doris table name. Format: db1.tbl1.
Yes
-
username
Username for accessing Doris.
Yes
-
password
Password for accessing Doris.
Yes
-
sink.batch.size
Maximum number of rows per backend write.
No
Default value: 100.
sink.max-retries
Number of retry attempts after BE write failure.
No
Default value: 1.
sink.batch.interval
Flush interval. After this time, the asynchronous threads will write data in the cache to the backend. The default value is 1s, and the supported time units include ms, s, min, h, and d. Setting to 0 means disabling periodic writes.
No
Default value: 1s.
sink.properties.*
The import parameters of the stream load. For example, sink.properties.column_separator' = ','.
No
-
sink.enable-2pc
Specifies whether to use transactional writes.
No
false

Catalog

Parameter
Description
Required
Remarks
type
-
Yes
Fixed value: doris.
fenodes
Doris FE HTTP Address
Yes
-
username
Username for accessing Doris.
Yes
-
password
Password for accessing Doris.
Yes
-
default-database
Default database.
Yes
-

WITH Parameters

Sink

Parameter
Description
Required
Default value.
connector
Source table type.
Yes
Fixed value: doris.
fenodes
Doris FE HTTP address.
Yes
-
table.identifier
Doris table name. Format: db1.tbl1.
Yes
-
username
Username for accessing Doris.
Yes
-
password
Password for accessing Doris.
Yes
-
sink.properties.*
The import parameters of the stream load
For example, 'sink.properties.column_separator' = ', ' defines the column separators, 'sink.properties.escape_delimiters' = 'true' uses special characters as separators, and '\\x01' will be converted to binary 0x01.
Importing in JSON Format
'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true'. For detailed parameters, please refer to Stream Load > Load Configuration Parameters.
Group Commit mode
For example, 'sink.properties.group_commit' = 'sync_mode' sets Group Commit to sync mode. The Flink connector supports import configuration Group Commit from version 1.6.2. For detailed usage and limitations, please refer to High Concurrency LOAD Optimization(Group Commit).
No
-
sink.enable-2pc
Specifies whether to enable 2PC. It is set to true by default, ensuring Exactly-Once semantics.
No
TRUE
sink.buffer-size
The size of the write data cache buffer (in bytes). We recommend that you use the default configuration instead of modification.
No
1MB
sink.buffer-count
The number of write data cache buffers. We recommend that you use the default configuration instead of modification.
No
3
sink.max-retries
Maximum retry attempts after a Commit failure. Default attempts: 3.
No
3
sink.use-cache
Specifies whether to use memory cache for recovery in case of an exception. Once it is enabled, the data during the Checkpoint period will be retained in the cache.
No
false
sink.enable.batch-mode
Specifies whether to use batching mode to write to Doris. When it is enabled, the write timing is independent of Checkpoint and controlled by the sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval parameter.
Enabling both options does not guarantee Exactly-Once semantics. Idempotency can be achieved with the Uniq model.
No
false
sink.flush.queue-size
The cache queue size in batching mode.
No
2
sink.buffer-flush.max-rows
Maximum rows of data per single batch in batching mode.
No
500000
sink.buffer-flush.max-bytes
The maximum number of bytes written to a single batch in batching mode.
No
100MB
sink.buffer-flush.interval
The interval of the asynchronous cache refresh in batching mode.
No
10s
sink.ignore.update-before
Specifies whether to ignore the update-before event. Ignore by default.
No
true
For more parameter descriptions, see Flink Doris Connector.

Type Mapping

Doris Field Type
Flink Field Type
NULL_TYPE
NULL
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
TIME
DATE
DATE
DATETIME
TIMESTAMP
CHAR
STRING
LARGEINT
VARCHAR
DECIMAL
DECIMAL
DECIMALV2
HLL
Unsupported datatype

Sample Code

Flink 1.13

----- Flink 1.13 -----
CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- Number of data rows generated per second.
);

CREATE TABLE doris_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'doris', -- Fixed value 'doris'.
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.
'table.identifier' = 'test.sales_order', -- Doris table name. Format: db.tbl.
username' = 'root', -- Username for accessing Doris, with write permission to the database.
password' = 'password', -- Password for accessing Doris.
'sink.batch.size' = '500', -- Maximum number of rows per BE write.
sink.batch.interval' = '1s' -- Flush interval. After this time, the asynchronous thread will write data in the cache to the backend. The default value is 1s, and the supported time units include ms, s, min, h, and d. Setting to 0 means disabling periodic writes.
);

INSERT INTO doris_sink_table select * from datagen_source_table;

Flink 1.14, Flink 1.16 and Flink 1.18


CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- Number of data rows generated per second.
);

CREATE TABLE doris_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:HTTP_PORT',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);

INSERT INTO doris_sink_table select * from datagen_source_table;
CREATE CATALOG doris_catalog WITH (
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.
username' = 'root', -- Username for accessing Doris, with write permission to the database.
password' = 'password', -- Password for accessing Doris.
'default-database' = 'default'
);

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- Number of data rows generated per second.
);

INSERT INTO doris_catalog.my_database.my_table SELECT * FROM.datagen_source_table;
MySQL-CDC integration with Doris - code example:
--mysql cdc source table.
CREATE TABLE mysql_cdc_source_table (
id INT NOT NULL,
name VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc' -- Fixed value 'mysql-cdc'.
'hostname' = 'YourHostName', -- IP address of the database.
port' = '3306', -- Port for database access.
'username' = 'YourUserName', -- Username for database access (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
password' = 'YourPassword', -- Password for database access.
'database-name' = 'YourDatabase', -- Database to be synchronized.
'table-name' = 'YourTable' -- Name of the data table to be synchronized.
);

-- Write to Doris table.
CREATE TABLE print_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'doris', -- Fixed value 'doris'.
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP address.
'table.identifier' = 'dbName.tableName', -- Doris table name. Format: db.tbl.
username' = 'YourUserName', -- Username for accessing Doris, with write permission to the database.
password' = 'YourPassword', -- Password for accessing Doris.
);

insert into print_table
select id,name from mysql_cdc_source_table;

Must-Knows

Upsert

If you want to use Upsert, the Doris table must be the Unique model or the Aggregate model. Example of creating a table:
-- Table creation statement for the Uniq model.
CREATE TABLE doris_sink_table (
id int(11),
name varchar(1024)
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "3");

-- Table creation statement for the Aggregate model.
CREATE TABLE doris_sink_table (
id int(11),
name varchar(1024) REPLACE DEFAULT '0'
)
AGGREGATE KEY('id')
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "3"); -- Note that if the backend nodes are insufficient, the error Failed to find enough host in all backends will occur. You can reduce this value as needed.

User Permission

You should have the write permission to the corresponding database.
CREATE USER 'test' IDENTIFIED BY 'test_passwd';
GRANT ALL ON test TO test;

More FAQs

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백