Flink Version | Description |
1.11 | Supported |
1.13 | Support (write to, batch source) |
1.14 | Supported |
1.16 | Supported |
_id field and updates the previous document version).CREATE TABLE elasticsearch6_sink_table (idINT,nameSTRING,PRIMARY KEY (id) NOT ENFORCED -- Corresponds to _id in Elasticsearch) WITH ('connector' = 'elasticsearch-6', --Output to Elasticsearch 6'username' = '$username', -- optional username'password' = '$password', -- optional password'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch connection address'index' = 'my-index', -- Elasticsearch index name'document-type'='_doc', --Document type of Elasticsearch'format' = 'json' -- Output data format, currently only support 'json');
CREATE TABLE elasticsearch7_sink_table (idINT,nameSTRING,PRIMARY KEY (id) NOT ENFORCED -- Corresponds to the _id in Elasticsearch) WITH ('connector' = 'elasticsearch-7', -- output to Elasticsearch 7'username' = '$username', -- optional username'password' = '$password', -- optional password'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch connection address'index' = 'my-index', --Index name of Elasticsearch'format' = 'json' -- Output data format, currently only support 'json');
CREATE TABLE elasticsearch7_source_table (idbigint,event_dateint,appint,primary key (id) not enforced) with (-required parameter'connector' = 'es-source','endPoint' = '127.0.0.1', -- Elasticsearch connection ip'accessId' = 'elastic', -- Username'accessKey' = 'PASSWORD', -- Password'indexName' = 'my-index', -- Elasticsearch index name'format' = 'json', -- data format, support only 'json'-optional parameter'scheme' => 'http', // connection protocol'port' = '9200', -- Port'batchSize' = '2000', -- Maximum number of documents retrieved per scroll request from the Elasticsearch cluster'keepScrollAliveSecs' = '60' -- maximum duration for scroll context retention, in minutes);
Parameter Value | Required | Default value. | Description |
connector | Yes | No | When writing to Elasticsearch 6.x, take value elasticsearch-6. When writing to Elasticsearch 7.x and above versions, take value elasticsearch-7. |
username | No | No | Username. |
password | No | No | Password. |
hosts | Yes | No | Elasticsearch connection address. |
index | Yes | No | The data will be written to the index. It supports fixed index (such as 'myIndex') and also supports dynamic index (such as 'index-{log_ts|yyyy-MM-dd}'). |
document-type | 6.x version: required 7.x version: optional | No | Type info in Elasticsearch document. When you select elasticsearch-7, this field cannot be filled in, otherwise error will occur. |
document-id.key-delimiter | No | _ | The separator for generating _id for a composite key (default is "_"). For example, if there are three primary keys a, b, and c, and a data entry has field a as "1", field b as "2", and field c as "3", using the default separator, the _id written to Elasticsearch is "1_2_3". |
drop-delete | No | false | Whether to filter DELETE messages from upstream. In addition, in the scenario where multi-table LEFT JOIN is performed and the JOIN Key is a non-primary Key, enabling this option can address the problem of Elasticsearch receiving considerable temporary null data. Notably, fields in the LEFT and right tables cannot contain null values, otherwise some data may be missing. |
failure-handler | No | fail | Specify the error handling policy when the request to Elasticsearch fails. Options are: fail: Throw an exception.ignore: ignore error, continue directly.retry-rejected: Retry write this record.Additionally, custom error handler is also supported. You can fill in the fully qualified name of the Handler class written by yourself (requires uploading custom program package). |
sink.flush-on-checkpoint | No | true | When Flink performs a snapshot, whether to wait for existing records to be fully written to Elasticsearch. If set to false, it may cause partial data loss or duplication when recovering, but the snapshot speed will be enhanced. |
sink.bulk-flush.max-actions | No | 1000 | The maximum number of entries for batch writing. Set to 0 to disable the batch function. |
sink.bulk-flush.max-size | No | 2mb | The maximum capacity of batch write cache must start with mb as the measurement unit. Set to 0 to disable the batch function. |
sink.bulk-flush.interval | No | 1s | The refresh cycle of batch writing. Set to 0 to disable the batch function. |
sink.bulk-flush.backoff.strategy | No | DISABLED | The policy for retry on failure during batch writing. DISABLED: do not retry.CONSTANT: wait for the milliseconds set in the sink.bulk-flush.backoff.delay option settings and try again.EXPONENTIAL: initially wait for the milliseconds set in the sink.bulk-flush.backoff.delay option settings and retry, exponentially increasing the waiting time every time it fails. |
sink.bulk-flush.backoff.max-retries | No | 8 | The maximum number of retries on failure during batch writing. |
sink.bulk-flush.backoff.delay | No | 50ms | When batch writing fails, the waiting interval between each retry (for CONSTANT policy) or the initial cardinal of the interval (for EXPONENTIAL policy). |
connection.max-retry-timeout | No | No | The maximum timeout time for retry requests, such as "20 s". |
connection.path-prefix | No | No | Specify the prefix for each REST request, such as '/v1'. Normally, this option is not required. |
format | No | json | The specified output format is the built-in json format by default. You can use the JSON format options described earlier (Kafka), such as json.fail-on-missing-field, json.ignore-parse-errors, and json.timestamp-format.standard. |
retry-on-conflict | No | No | The maximum number of retries allowed for version conflict exceptions during an update operation. Exceeding this count will throw an exception and result in job failure. Description: Currently, only supports Flink-1.13. |
Parameter Value | Required | Default value. | Description |
connector | Yes | No | Fixed Value es-source |
endPoint | Yes | No | Elasticsearch connect IP, example 127.0.0.1 |
accessId | Yes | No | Username |
accessKey | Yes | No | Password |
indexName | Yes | No | Read Index |
format | Yes | No | Specify the read format, which supports only the built-in json format. You can use the JSON format options described earlier (Kafka), such as json.fail-on-missing-field, json.ignore-parse-errors, and json.timestamp-format.standard. |
scheme | No | http | Elasticsearch connection mode, such as http, https |
port | No | 9200 | Elasticsearch connection port |
batchSize | No | 2000 | Maximum number of documents retrieved per scroll request from the ES cluster |
keepScrollAliveSecs | No | 60 | Maximum duration of scroll context retention, in minutes |
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- Number of data records generated per second);CREATE TABLE elasticsearch7_sink_table (idINT,nameSTRING) WITH ('connector' = 'elasticsearch-7', -- output to Elasticsearch 7'username' = '$username', -- optional username'password' = '$password', -- optional password'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch connection address'index' = 'my-index', -- Name of the Elasticsearch index'sink.bulk-flush.max-actions' = '1000', -- Data refresh frequency'sink.bulk-flush.interval' = '1s' -- Data refresh interval'format' = 'json' -- Output data format, currently only support 'json');INSERT INTO elasticsearch7_sink_table select * from datagen_source_table;
CREATE TABLE elasticsearch7_source_table (idbigint,event_dateint,appint,primary key (id) not enforced) with (-required parameter'connector' = 'es-source','endPoint' = '127.0.0.1', -- Elasticsearch connection ip'accessId' = 'elastic', -- Username'accessKey' = 'PASSWORD', -- Password'indexName' = 'my-index', -- Elasticsearch index name': 'format' = 'json', -- Data format, supports only 'json'-optional parameter'scheme' => 'http', // connection protocol'port' = '9200', -- Port'batchSize' = '2000', -- Maximum number of documents retrieved per scroll request from the ES cluster'keepScrollAliveSecs' = '60' -- maximum duration for scroll context retention, in minutes);CREATE TABLE logger_sink (idbigint,event_dateint,appint,primary key (id) not enforced) with ('connector' = 'logger');INSERT INTO logger_sink SELECT * from elasticsearch7_source_table;
피드백