Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 |
1.14 | 支持 |
1.16 | 支持 |
-- register a MongoDB table 'products' in Flink SQLCREATE TABLE mongo_cdc_source_table (_id STRING, // must be declaredname STRING,weight DECIMAL(10,3),tags ARRAY<STRING>, -- arrayprice ROW<amount DECIMAL(10,2), currency STRING>, -- embedded documentsuppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documentsPRIMARY KEY(_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017,localhost:27018,localhost:27019','username' = 'flinkuser','password' = 'flinkpw','database' = 'inventory','collection' = 'products');
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值为 mongodb-cdc |
hosts | MongoDB 数据库的 IP 端口对 | 是 | - |
username | MongoDB 数据库服务的用户名 | 是 | - |
password | MongoDB 数据库服务的密码 | 是 | - |
database | MongoDB 数据库名称 | 是 | - |
collection | MongoDB Collection 名称 | 是 | - |
connection.options | 否 | - | |
errors.tolerance | 是否忽略错误记录,接受 none 或者 all。如果设置为 all, 忽略错误记录 | 否 | none |
errors.log.enable | 是否需要把错误操作打印到日志文件 | 否 | 默认值为 true |
copy.existing | 是否复制库中原有的数据,如果在复制期间对数据有更改,会在数据复制完成后应用更改 | 否 | 默认值为 true |
copy.existing.pipeline | 当复制原有数据的时候,可以通过这个参数设置筛选条件。例如 [{"$match": {"closed": "false"}}] ,只会复制 closed 为 false 的 记录。用法参考 $match (aggregation) | 否 | - |
copy.existing.max.threads | 执行数据复制时要使用的线程数 | 否 | 默认值为 Processors Count |
copy.existing.queue.size | 复制数据时要使用的队列的最大大小 | 否 | 默认值为16000 |
poll.max.batch.size | 每次拉取数据的最大数量。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据 | 否 | 默认值为1000 |
poll.await.time.ms | 拉取数据的时间间隔。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据 | 否 | 默认值为1500 |
heartbeat.interval.ms | 发送心跳消息时间间隔,以毫秒为单位。使用0禁用 | 否 | 默认值为0 |
MongoDB 字段类型 | Flink 字段类型 |
- | TINYINT |
- | SMALLINT |
Int | INT |
Long | BIGINT |
- | FLOAT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3)TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0)TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |
CREATE TABLE mongo_cdc_source_table (_id STRING, // must be declaredname STRING,weight DECIMAL(10,3),tags ARRAY<STRING>, -- arrayprice ROW<amount DECIMAL(10,2), currency STRING>, -- embedded documentsuppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documentsPRIMARY KEY(_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017,localhost:27018,localhost:27019','username' = 'flinkuser','password' = 'flinkpw','database' = 'inventory','collection' = 'products');CREATE TABLE `print_table` (`id` STRING,`name` STRING,`currency` STRING) WITH ('connector' = 'print');insert into print_table select _id, name, price.currency from mongo_cdc_source_table;
use admin;db.createUser({user: "flinkuser",pwd: "flinkpw",roles: [{ role: "read", db: "admin" },{ role: "readAnyDatabase", db: "admin" }]});
本页内容是否解决了您的问题?