Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported |
1.14 | Supported |
1.16 | Supported |
-- register a MongoDB table 'products' in Flink SQLCREATE TABLE mongo_cdc_source_table (_id STRING, // must be declaredname STRING,weight DECIMAL(10,3),tags ARRAY<STRING>, -- arrayprice ROW<amount DECIMAL(10,2), currency STRING>, -- embedded documentsuppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documentsPRIMARY KEY(_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017,localhost:27018,localhost:27019','username' = 'flinkuser','password' = 'flinkpw','database' = 'inventory','collection' = 'products');
Option | Description | Required | Remarks |
connector | The connector to use. | Yes | The value must be 'mysql-cdc'. |
hosts | IP and port of the MongoDB database server. | Yes | - |
username | Name of the database user to use when connecting to MongoDB. | Yes | - |
password | Password of the database user to use when connecting to MongoDB. | Yes | - |
database | The name of the MongoDB database to watch for changes. | Yes | - |
collection | The name of the collection in the MongoDB database to watch for changes. | Yes | - |
connection.options | The ampersand-separated Connection String Options of MongoDB, such as relicaSet=test&connectTimeoutMS=300000 . | No | - |
errors.tolerance | Whether to ignore error records. Valid values: none and all . If it is set to all , all error records will be ignored. | No | none |
errors.log.enable | Whether to print errors in logs. | No | Default value: true . |
copy.existing | Whether to copy the existing data in the database. If changes are made to the data during the copying, they will apply after the copying is completed. | No | Default value: true . |
copy.existing.pipeline | This option allows setting filters for copying the existing data. For example, if you set it to [{"$match": {"closed": "false"}}] , only records whose value of closed is false will be copied. For how to use this option, see $match (aggregation). | No | - |
copy.existing.max.threads | The number of threads to use when copying data. | No | Default value: Processors Count . |
copy.existing.queue.size | The maximum size of the queue to use when copying data. | No | Default value: 16000 . |
poll.max.batch.size | The maximum number of change stream documents to include in a single batch when polling for new data. By default, with a check interval of 1.5s, up to 1,000 documents can be included each time. | No | Default value: 1000 . |
poll.await.time.ms | The amount of time to wait before checking for new results on the change stream. By default, with a check interval of 1.5s, up to 1,000 documents can be included each time. | No | Default value: 1500 . |
heartbeat.interval.ms | The length of time in milliseconds between sending heartbeat messages. Set it to 0 to disable the feature. | No | Default value: 0 . |
heartbeat.interval.ms
to an appropriate value. A resumeToken
is included in a heartbeat message to avoid the use of an expired resumeToken
when a Flink job resumes from checkpoint or savepoint.MongoDB Type | Flink Type |
- | TINYINT |
- | SMALLINT |
Int | INT |
Long | BIGINT |
- | FLOAT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |
CREATE TABLE mongo_cdc_source_table (_id STRING, // must be declaredname STRING,weight DECIMAL(10,3),tags ARRAY<STRING>, -- arrayprice ROW<amount DECIMAL(10,2), currency STRING>, -- embedded documentsuppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documentsPRIMARY KEY(_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017,localhost:27018,localhost:27019','username' = 'flinkuser','password' = 'flinkpw','database' = 'inventory','collection' = 'products');CREATE TABLE `print_table` (`id` STRING,`name` STRING,`currency` STRING) WITH ('connector' = 'print');insert into print_table select _id, name, price.currency from mongo_cdc_source_table;
changeStream
and read
permissions.use admin;db.createUser({user: "flinkuser",pwd: "flinkpw",roles: [{ role: "read", db: "admin" },{ role: "readAnyDatabase", db: "admin" }]});
Was this page helpful?