tencent cloud

Stream Compute Service

문서Stream Compute ServiceSQL Developer GuideConnectorsElasticsearch Service Data Analytics Engine

Elasticsearch Service Data Analytics Engine

Download
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-05-12 21:49:36

Description

The Elasticsearch Connector provides writing and read support for Elasticsearch. Currently, Oceanus supports Elasticsearch 6.x and 7.x versions.

Version Description

Flink Version
Description
1.11
Supported
1.13
Support (write to, batch source)
1.14
Supported
1.16
Supported

Applicable Scope

Elasticsearch supports writing and can serve as the target table (Sink) for a Tuple stream or an Upsert stream (automatically generates the primary key using the document _id field and updates the previous document version).
If you want to consume the change records of a JDBC database as a streaming source table, you can use Debezium or Canal to capture and subscribe to changes in the JDBC database, then Flink can perform further handling of these change events. See Kafka.
Oceanus supports batch mode reads for Elasticsearch and currently only supports Elasticsearch 7.

DDL Definition

Used As a Data Sink for Elasticsearch 6

CREATE TABLE elasticsearch6_sink_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED -- Corresponds to _id in Elasticsearch
) WITH (
'connector' = 'elasticsearch-6', --Output to Elasticsearch 6
'username' = '$username', -- optional username
'password' = '$password', -- optional password
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch connection address
'index' = 'my-index', -- Elasticsearch index name
'document-type'='_doc', --Document type of Elasticsearch
'format' = 'json' -- Output data format, currently only support 'json'
);

Used As a Data Sink for Elasticsearch 7

CREATE TABLE elasticsearch7_sink_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED -- Corresponds to the _id in Elasticsearch
) WITH (
'connector' = 'elasticsearch-7', -- output to Elasticsearch 7
'username' = '$username', -- optional username
'password' = '$password', -- optional password
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch connection address
'index' = 'my-index', --Index name of Elasticsearch
'format' = 'json' -- Output data format, currently only support 'json'
);

As a Batch Source (Source) for Elasticsearch 7

CREATE TABLE elasticsearch7_source_table (
id bigint,
event_date int,
app int,
primary key (id) not enforced
) with (
-required parameter
'connector' = 'es-source',
'endPoint' = '127.0.0.1', -- Elasticsearch connection ip
'accessId' = 'elastic', -- Username
'accessKey' = 'PASSWORD', -- Password
'indexName' = 'my-index', -- Elasticsearch index name
'format' = 'json', -- data format, support only 'json'
-optional parameter
'scheme' => 'http', // connection protocol
'port' = '9200', -- Port
'batchSize' = '2000', -- Maximum number of documents retrieved per scroll request from the Elasticsearch cluster
'keepScrollAliveSecs' = '60' -- maximum duration for scroll context retention, in minutes
);

WITH Parameter

As a Data Sink

Parameter Value
Required
Default value.
Description
connector
Yes
No
When writing to Elasticsearch 6.x, take value elasticsearch-6. When writing to Elasticsearch 7.x and above versions, take value elasticsearch-7.
username
No
No
Username.
password
No
No
Password.
hosts
Yes
No
Elasticsearch connection address.
index
Yes
No
The data will be written to the index. It supports fixed index (such as 'myIndex') and also supports dynamic index (such as 'index-{log_ts|yyyy-MM-dd}').
document-type
6.x version: required
7.x version: optional
No
Type info in Elasticsearch document. When you select elasticsearch-7, this field cannot be filled in, otherwise error will occur.
document-id.key-delimiter
No
_
The separator for generating _id for a composite key (default is "_"). For example, if there are three primary keys a, b, and c, and a data entry has field a as "1", field b as "2", and field c as "3", using the default separator, the _id written to Elasticsearch is "1_2_3".
drop-delete
No
false
Whether to filter DELETE messages from upstream.
In addition, in the scenario where multi-table LEFT JOIN is performed and the JOIN Key is a non-primary Key, enabling this option can address the problem of Elasticsearch receiving considerable temporary null data. Notably, fields in the LEFT and right tables cannot contain null values, otherwise some data may be missing.
failure-handler
No
fail
Specify the error handling policy when the request to Elasticsearch fails. Options are:
fail: Throw an exception.
ignore: ignore error, continue directly.
retry-rejected: Retry write this record.
Additionally, custom error handler is also supported. You can fill in the fully qualified name of the Handler class written by yourself (requires uploading custom program package).
sink.flush-on-checkpoint
No
true
When Flink performs a snapshot, whether to wait for existing records to be fully written to Elasticsearch. If set to false, it may cause partial data loss or duplication when recovering, but the snapshot speed will be enhanced.
sink.bulk-flush.max-actions
No
1000
The maximum number of entries for batch writing. Set to 0 to disable the batch function.
sink.bulk-flush.max-size
No
2mb
The maximum capacity of batch write cache must start with mb as the measurement unit. Set to 0 to disable the batch function.
sink.bulk-flush.interval
No
1s
The refresh cycle of batch writing. Set to 0 to disable the batch function.
sink.bulk-flush.backoff.strategy
No
DISABLED
The policy for retry on failure during batch writing.
DISABLED: do not retry.
CONSTANT: wait for the milliseconds set in the sink.bulk-flush.backoff.delay option settings and try again.
EXPONENTIAL: initially wait for the milliseconds set in the sink.bulk-flush.backoff.delay option settings and retry, exponentially increasing the waiting time every time it fails.
sink.bulk-flush.backoff.max-retries
No
8
The maximum number of retries on failure during batch writing.
sink.bulk-flush.backoff.delay
No
50ms
When batch writing fails, the waiting interval between each retry (for CONSTANT policy) or the initial cardinal of the interval (for EXPONENTIAL policy).
connection.max-retry-timeout
No
No
The maximum timeout time for retry requests, such as "20 s".
connection.path-prefix
No
No
Specify the prefix for each REST request, such as '/v1'. Normally, this option is not required.
format
No
json
The specified output format is the built-in json format by default. You can use the JSON format options described earlier (Kafka), such as json.fail-on-missing-field, json.ignore-parse-errors, and json.timestamp-format.standard.

retry-on-conflict
No
No

The maximum number of retries allowed for version conflict exceptions during an update operation. Exceeding this count will throw an exception and result in job failure.
Description: Currently, only supports Flink-1.13.


As a Data Source

Parameter Value
Required
Default value.
Description
connector
Yes
No
Fixed Value es-source
endPoint
Yes
No
Elasticsearch connect IP, example 127.0.0.1
accessId
Yes
No
Username
accessKey
Yes
No
Password
indexName
Yes
No
Read Index
format
Yes
No
Specify the read format, which supports only the built-in json format. You can use the JSON format options described earlier (Kafka), such as json.fail-on-missing-field, json.ignore-parse-errors, and json.timestamp-format.standard.
scheme
No
http

Elasticsearch connection mode, such as http, https

port
No
9200
Elasticsearch connection port
batchSize
No
2000
Maximum number of documents retrieved per scroll request from the ES cluster
keepScrollAliveSecs
No
60
Maximum duration of scroll context retention, in minutes

Sample Code

As a Data Sink

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- Number of data records generated per second
);

CREATE TABLE elasticsearch7_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'elasticsearch-7', -- output to Elasticsearch 7
'username' = '$username', -- optional username
'password' = '$password', -- optional password
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch connection address
'index' = 'my-index', -- Name of the Elasticsearch index
'sink.bulk-flush.max-actions' = '1000', -- Data refresh frequency
'sink.bulk-flush.interval' = '1s' -- Data refresh interval
'format' = 'json' -- Output data format, currently only support 'json'
);

INSERT INTO elasticsearch7_sink_table select * from datagen_source_table;

As a Batch Source

CREATE TABLE elasticsearch7_source_table (
id bigint,
event_date int,
app int,
primary key (id) not enforced
) with (
-required parameter
'connector' = 'es-source',
'endPoint' = '127.0.0.1', -- Elasticsearch connection ip
'accessId' = 'elastic', -- Username
'accessKey' = 'PASSWORD', -- Password
'indexName' = 'my-index', -- Elasticsearch index name
': 'format' = 'json', -- Data format, supports only 'json'
-optional parameter
'scheme' => 'http', // connection protocol
'port' = '9200', -- Port
'batchSize' = '2000', -- Maximum number of documents retrieved per scroll request from the ES cluster
'keepScrollAliveSecs' = '60' -- maximum duration for scroll context retention, in minutes
);

CREATE TABLE logger_sink (
id bigint,
event_date int,
app int,
primary key (id) not enforced
) with (
'connector' = 'logger'
);

INSERT INTO logger_sink SELECT * from elasticsearch7_source_table;

Must-Knows

If you wish to connect to other versions of Elasticsearch, upload the appropriate Elasticsearch Sink JAR package via an extra custom program package.

Monitoring metric description

Oceanus has added many useful statistical metrics for the ES Connector. Click the ES Sink operator in the Flink UI running figure to search and view metrics:
numberOfInsertRecords: Get output +I message count.
numberOfDeleteRecords: Get the -D message count in the output.
numberOfUpdateBeforeRecords: Get output -U message count.
numberOfUpdateAfterRecords: Get output +U message count.

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백