tencent cloud

文档反馈

MongoDB 数据订阅

最后更新时间:2024-09-09 21:38:57

    简介

    MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
    本文档是对 Mongo 官方文档的归纳和整理,详情参见 MongoDB Change Events

    事件格式

    以下 JSON 框架展示了所有修改事件消息中可能出现的字段:
    {
    _id : { <BSON Object> },
    "operationType" : "<operation>",
    "fullDocument" : { <document> },
    "ns" : {
    "db" : "<database>",
    "coll" : "<collection>"
    },
    "to" : {
    "db" : "<database>",
    "coll" : "<collection>"
    },
    "documentKey" : { "_id" : <value> },
    "updateDescription" : {
    "updatedFields" : { <document> },
    "removedFields" : [ "<field>", ... ],
    "truncatedArrays" : [
    { "field" : <field>, "newSize" : <integer> },
    ...
    ]
    },
    "clusterTime" : <Timestamp>,
    "txnNumber" : <NumberLong>,
    "lsid" : {
    "id" : <UUID>,
    "uid" : <BinData>
    }
    }
    
    其中部分字段可能只在特定的事件类型中才会出现,下表对相应字段及其含义进行了描述。
    Field
    Type
    Description
    _id
    document
    一个用来唯一标识事件的 BSON 对象。 _id 对象的格式如下:{ "_data" : <BinData|hex string>} 。_data 的类型取决于 MongoDB 的版本 ,可通过 Resume Tokens 查看完整的_data类型介绍。
    operationType
    string
    触发修改事件的操作类型,具体包括以下 8 种:insertdeletereplaceupdatedroprenamedropDatabaseinvalidate
    fullDocument
    document
    表示被新增( insert), 替换(replace), 删除(delete), 更新(update )操作所影响的文档。对于 insert 和 replace 操作,该字段表示新增的文档。对于 delete 操作,该字段缺省表示文档已经不存在。对于 update 操作,只有配置了 fullDocument 为 updateLookup 时才会显示。
    ns
    document
    命名空间(namespace),由 database 和 collection 构成。
    ns.db
    string
    数据库名称。
    ns.coll
    string
    集合名称。对于 dropDatabase 操作,该字段缺省。
    to
    document
    当操作类型为 rename 时,表示新的集合名称。该字段对其他操作是缺省的。
    to.db
    string
    新的数据库的名称。
    to.coll
    string
    新的集合名称。
    documentKey
    document
    操作修改的文档的 ID。
    updateDescription
    document
    一个用来描述被更新操作(update operation)修改的字段的文档。该字段仅当事件对应的操作为 update 时才有。
    updateDescription.updatedFields
    document
    包含被更新操作修改的字段,字段的 value 值为更新后的值。
    updateDescription.removedFields
    array
    包含被更新操作删除的字段。
    updateDescription.truncatedArrays
    array
    其中记录了使用以下一个或多个基于 pipeline 的更新执行的数组截断:$addFields$set$replaceRoot$replaceWith
    updateDescription.truncatedArrays.field
    string
    被删除的字段。
    updateDescription.truncatedArrays.newSize
    integer
    truncated array 中的元素个数。
    clusterTime
    Timestamp
    oplog 与事件关联的时间戳。对于涉及 多文档事务, 关联的事件的 clusterTime 值是相同的。
    txnNumber
    NumberLong
    事务 ID。仅当操作是 多文档事务 时出现。
    lsid
    Document
    与事务关联的 session 的 ID,仅当操作是 多文档事务 时出现。

    事件列表

    新增事件(insert event)

    {
    _id: { < Resume Token > },
    operationType: 'insert',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    userName: 'alice123',
    _id: ObjectId("599af247bb69cd8996xxxxxx")
    },
    fullDocument: {
    _id: ObjectId("599af247bb69cd8996xxxxxx"),
    userName: 'alice123',
    name: 'Alice'
    }
    }
    其中 documentKey 字段同时包含了 _id 和 username 字段。表示 engineering.users 集合是分片的,shard key 为 username 和 _id。

    更新事件(update event)

    {
    _id: { < Resume Token > },
    operationType: 'update',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("58a4eb4a30c75625e0xxxxxx")
    },
    updateDescription: {
    updatedFields: {
    email: 'alice@10gen.com'
    },
    removedFields: ['phoneNumber'],
    truncatedArrays: [ {
    "field" : "vacation_time",
    "newSize" : 36
    } ]
    }
    }
    以下例子展示了 update event 配置了 fullDocument : updateLookup 选项的消息内容:
    {
    _id: { < Resume Token > },
    operationType: 'update',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("58a4eb4a30c75625e0xxxxxx")
    },
    updateDescription: {
    updatedFields: {
    email: 'alice@10gen.com'
    },
    removedFields: ['phoneNumber'],
    truncatedArrays: [ {
    "field" : "vacation_time",
    "newSize" : 36
    } ]
    },
    fullDocument: {
    _id: ObjectId("58a4eb4a30c75625e0xxxxxx"),
    name: 'Alice',
    userName: 'alice123',
    email: 'alice@10gen.com',
    team: 'replication'
    }
    }

    替换事件(replace event)

    {
    _id: { < Resume Token > },
    operationType: 'replace',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("599af247bb69cd8996xxxxxx")
    },
    fullDocument: {
    _id: ObjectId("599af247bb69cd8996xxxxxx"),
    userName: 'alice123',
    name: 'Alice'
    }
    }
    replace 操作是通过两步操作实现的:
    删除原 documentKey 对应的文档
    根据一样的 documentkey插入新的文档
    基于 replace 事件的 fullDocument 字段表示的是插入后的新文档。

    删文档事件(delete event)

    {
    _id: { < Resume Token > },
    operationType: 'delete',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("599af247bb69cd8996xxxxxx")
    }
    }
    对于删除文档事件的消息,fullDocument 字段缺省。

    删集合事件(drop event)

    {
    _id: { < Resume Token > },
    operationType: 'drop',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    }
    }
    当一个集合被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。

    改名事件(rename event)

    {
    _id: { < Resume Token > },
    operationType: 'rename',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    to: {
    db: 'engineering',
    coll: 'people'
    }
    }
    当一个集合名称被更改时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。

    删库事件(drop database event)

    {
    _id: { < Resume Token > },
    operationType: 'dropDatabase',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering'
    }
    }
    当一个数据库被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
    在生成数据库删除事件(dropDatabase)之前,会为数据库中的每一个集合生成一个集合删除事件(drop event)。

    无效事件(invalidate event)

    {
    _id: { < Resume Token > },
    operationType: 'invalidate',
    clusterTime: <Timestamp>
    }
    对于订阅了一个集合(collection)的 connector,drop event,rename event 或 dropDatabase event 这类会对该集合产生影响的事件都会产生一个无效事件。
    对于订阅了一个数据库(database)的 connector,dropDatabase event 会产生一个无效事件。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持