tencent cloud

Stream Compute Service

Releases Notes and Announcements
Release Notes
Product Introduction
Overview
Strengths
Use Cases
Purchase Guide
Billing Overview
Billing Mode
Refund
Configuration Adjustments
Getting Started
Preparations
Creating a Private Cluster
Creating a SQL Job
Creating a JAR Job
Creating an ETL Job
Creating a Python Job
Operation Guide
Managing Jobs
Developing Jobs
Monitoring Jobs
Job Logs
Events and Diagnosis
Managing Metadata
Managing Checkpoints
Tuning Jobs
Managing Dependencies
Managing Clusters
Managing Permissions
SQL Developer Guide
Overview
Glossary and Data Types
DDL Statements
DML Statements
Merging MySQL CDC Sources
Connectors
SET Statement
Operators and Built-in Functions
Identifiers and Reserved Words
Python Developer Guide
ETL Developer Guide
Overview
Glossary
Connectors
FAQ
Contact Us

MongoDB

PDF
Mode fokus
Ukuran font
Terakhir diperbarui: 2023-11-08 14:51:17

Overview

The ‌Flink connector mongodb allows batch data writing from Flink to MongoDB. It currently supports only append (tuple) streams.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Unsupported
1.16
Supported

Defining a table in DDL

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- Here, it should be 'mongodb'.
'database' = 'test', -- The name of the database.
'collection' = 'table1',-- The data collection.
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
'batchSize' = '1024' -- The number of records written per batch.
);

Limits

The Flink connector mongodb can be used as a MongoDB sink only. It supports using a TencentDB for MongoDB table as a result table.

WITH parameters

Option
Description
Required
Remarks
connector
The result table type.
Yes
Here, it should be mongodb.
database
The name of the database.
Yes
-
collection
The data collection.
Yes
-
uri
The MongoDB connection string.
Yes
-
batchSize
The number of records written per batch.
No
Default value: 1024.
maxConnectionIdleTime
The connection timeout period.
No
Default value: 60000 (ms).

Example

CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- The number of records generated per second.
'fields.user_id.kind' = 'sequence', -- Whether a bounded sequence (if yes, the output automatically stops after the sequence ends).
'fields.user_id.start' = '1', -- The start value of the sequence.
'fields.user_id.end' = '10000', -- The end value of the sequence.
'fields.item_id.kind' = 'random', -- A random number without range.
'fields.item_id.min' = '1', -- ‍The minimum random number.
'fields.item_id.max' = '1000', -- The maximum random number.
'fields.category_id.kind' = 'random', -- A random number without range.
'fields.category_id.min' = '1', -- The minimum random number.
'fields.category_id.max' = '1000', -- The maximum random number.
'fields.behavior.length' = '5' -- The random string length.
);

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- Here, it should be 'mongodb'.
'database' = 'test', -- The name of the database.
'collection' = 'table1',-- The data collection.
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
'batchSize' = '1024' -- The number of records written per batch.
);

insert into mongodb select * from random_source;

Notes

Upsert

‌The Flink connector mongodb as a sink does not support upsert streams.

User permissions

The user of the MongoDB database must have the permission to write data to the database.

Bantuan dan Dukungan

Apakah halaman ini membantu?

masukan