tencent cloud

All product documents
Stream Compute Service
Last updated: 2023-11-08 14:51:17
MongoDB
Last updated: 2023-11-08 14:51:17

Overview

The ‌Flink connector mongodb allows batch data writing from Flink to MongoDB. It currently supports only append (tuple) streams.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Unsupported
1.16
Supported

Defining a table in DDL

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- Here, it should be 'mongodb'.
'database' = 'test', -- The name of the database.
'collection' = 'table1',-- The data collection.
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
'batchSize' = '1024' -- The number of records written per batch.
);

Limits

The Flink connector mongodb can be used as a MongoDB sink only. It supports using a TencentDB for MongoDB table as a result table.

WITH parameters

Option
Description
Required
Remarks
connector
The result table type.
Yes
Here, it should be mongodb.
database
The name of the database.
Yes
-
collection
The data collection.
Yes
-
uri
The MongoDB connection string.
Yes
-
batchSize
The number of records written per batch.
No
Default value: 1024.
maxConnectionIdleTime
The connection timeout period.
No
Default value: 60000 (ms).

Example

CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- The number of records generated per second.
'fields.user_id.kind' = 'sequence', -- Whether a bounded sequence (if yes, the output automatically stops after the sequence ends).
'fields.user_id.start' = '1', -- The start value of the sequence.
'fields.user_id.end' = '10000', -- The end value of the sequence.
'fields.item_id.kind' = 'random', -- A random number without range.
'fields.item_id.min' = '1', -- ‍The minimum random number.
'fields.item_id.max' = '1000', -- The maximum random number.
'fields.category_id.kind' = 'random', -- A random number without range.
'fields.category_id.min' = '1', -- The minimum random number.
'fields.category_id.max' = '1000', -- The maximum random number.
'fields.behavior.length' = '5' -- The random string length.
);

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- Here, it should be 'mongodb'.
'database' = 'test', -- The name of the database.
'collection' = 'table1',-- The data collection.
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
'batchSize' = '1024' -- The number of records written per batch.
);

insert into mongodb select * from random_source;

Notes

Upsert

‌The Flink connector mongodb as a sink does not support upsert streams.

User permissions

The user of the MongoDB database must have the permission to write data to the database.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon