tencent cloud

Feedback

MongoDB Data Subscription

Last updated: 2024-09-09 21:38:37

    Introduction

    The MongoDB Kafka Connector allows monitoring all databases or a single database within a MongoDB instance. It also allows monitoring all collections or a single collection within a database. The connector generates change event messages from Mongo modifications and submits them as a message flow to a Kafka topic. Client applications can consume the messages from the corresponding Kafka topic to process database change events, achieving the goal of monitoring specific databases.
    This document summarizes and organizes information from the official MongoDB documentation. For details, see MongoDB Change Events.

    Event Format

    The following JSON framework illustrates the fields that may appear in all change event messages:
    {
    _id : { <BSON Object> },
    "operationType" : "<operation>",
    "fullDocument" : { <document> },
    "ns" : {
    "db" : "<database>",
    "coll" : "<collection>"
    },
    "to" : {
    "db" : "<database>",
    "coll" : "<collection>"
    },
    "documentKey" : { "_id" : <value> },
    "updateDescription" : {
    "updatedFields" : { <document> },
    "removedFields" : [ "<field>", ... ],
    "truncatedArrays" : [
    { "field" : <field>, "newSize" : <integer> },
    ...
    ]
    },
    "clusterTime" : <Timestamp>,
    "txnNumber" : <NumberLong>,
    "lsid" : {
    "id" : <UUID>,
    "uid" : <BinData>
    }
    }
    
    Some fields may only appear in certain event types. The table below describes the corresponding fields and their meanings.
    Field
    Type
    Description
    _id
    document
    A BSON object used to uniquely identify the event. The format of the _id object is as follows: { "_data" : <BinData|hex string>}. The type of _data depends on the version of MongoDB. For a complete description of the _data type, see Resume Tokens.
    operationType
    string
    This field indicates the operation types that trigger the change events, including the following 8 types: insert, delete, replace, update, drop, rename, dropDatabase, and invalidate.
    fullDocument
    document
    This field indicates the documents affected by the insert, replace, delete, and update operations. For insert and replace operations, this field indicates the new document. For delete operations, this field is omitted, indicating the document no longer exists. For update operations, this field is shown only if fullDocument is configured as updateLookup.
    ns
    document
    Refers to the namespace, consisting of the database and collection.
    ns.db
    string
    Refers to the database name.
    ns.coll
    string
    Refers to the collection name. For dropDatabase operations, this field is omitted.
    to
    document
    When the operation type is Rename, this field indicates the new collection name. This field is omitted for other operations.
    to.db
    string
    Refers to the name of the new database.
    to.coll
    string
    Refers to the new collection name.
    documentKey
    document
    Refers to the ID of the document modified by the operation.
    updateDescription
    document
    Refers to a document that describes the field modified by the update operation. This field is present only if the event corresponds to an update operation.
    updateDescription.updatedFields
    document
    This field contains the fields modified by the update operation, with the value of the field being the updated value.
    updateDescription.removedFields
    array
    This field contains the fields deleted by the update operation.
    updateDescription.truncatedArrays
    array
    This field records the array truncation performed using one or more of the following pipeline-based updates:$addFields$set$replaceRoot$replaceWith
    updateDescription.truncatedArrays.field
    string
    Indicates the field that was removed.
    updateDescription.truncatedArrays.newSize
    integer
    Refers to the number of elements in the truncated array.
    clusterTime
    Timestamp
    Refers to the oplog timestamp associated with the event. For events involving Multi-Document Transactions, the clusterTime values associated with the event are the same.
    txnNumber
    NumberLong
    Refers to the transaction ID. It appears only when the operation is a Multi-Document Transaction.
    lsid
    Document
    Refers to the session ID associated with the transaction. It appears only when the operation is a Multi-Document Transaction.

    Event List

    Insert Event

    {
    _id: { < Resume Token > },
    operationType: 'insert',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    userName: 'alice123',
    _id: ObjectId("599af247bb69cd8996xxxxxx")
    },
    fullDocument: {
    _id: ObjectId("599af247bb69cd8996xxxxxx"),
    userName: 'alice123',
    name: 'Alice'
    }
    }
    The documentKey field contains both _id and username fields, indicating that the engineering.users collection is sharded, with the shard key being username and _id.

    Update Event

    {
    _id: { < Resume Token > },
    operationType: 'update',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("58a4eb4a30c75625e0xxxxxx")
    },
    updateDescription: {
    updatedFields: {
    email: 'alice@10gen.com'
    },
    removedFields: ['phoneNumber'],
    truncatedArrays: [ {
    "field" : "vacation_time",
    "newSize" : 36
    } ]
    }
    }
    The following example shows the message content of an update event with the fullDocument : updateLookup option configured:
    {
    _id: { < Resume Token > },
    operationType: 'update',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("58a4eb4a30c75625e0xxxxxx")
    },
    updateDescription: {
    updatedFields: {
    email: 'alice@10gen.com'
    },
    removedFields: ['phoneNumber'],
    truncatedArrays: [ {
    "field" : "vacation_time",
    "newSize" : 36
    } ]
    },
    fullDocument: {
    _id: ObjectId("58a4eb4a30c75625e0xxxxxx"),
    name: 'Alice',
    userName: 'alice123',
    email: 'alice@10gen.com',
    team: 'replication'
    }
    }

    Replace Event

    {
    _id: { < Resume Token > },
    operationType: 'replace',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("599af247bb69cd8996xxxxxx")
    },
    fullDocument: {
    _id: ObjectId("599af247bb69cd8996xxxxxx"),
    userName: 'alice123',
    name: 'Alice'
    }
    }
    The replace operation is performed in two steps:
    Delete the original document corresponding to the documentKey.
    Insert a new document with the same documentkey.
    For a replace event, the fullDocument field represents the new document inserted.

    Delete Event

    {
    _id: { < Resume Token > },
    operationType: 'delete',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    documentKey: {
    _id: ObjectId("599af247bb69cd8996xxxxxx")
    }
    }
    For the delete event message, the fullDocument field is omitted.

    Drop Event

    {
    _id: { < Resume Token > },
    operationType: 'drop',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    }
    }
    When a collection is deleted, this event is triggered, and it causes the connector subscribing to that collection to generate an invalidate event.

    Rename Event

    {
    _id: { < Resume Token > },
    operationType: 'rename',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering',
    coll: 'users'
    },
    to: {
    db: 'engineering',
    coll: 'people'
    }
    }
    When a collection name is modified, this event is triggered, and it causes the connector subscribing to that collection to generate an invalidate event.

    Drop Database Event

    {
    _id: { < Resume Token > },
    operationType: 'dropDatabase',
    clusterTime: <Timestamp>,
    ns: {
    db: 'engineering'
    }
    }
    When a database is deleted, this event is triggered, and it causes the connector subscribing to that collection to generate an invalidate event.
    Before a drop database event (dropDatabase) is generated, the system will generate a drop event for each collection in the database.

    Invalidate Event

    {
    _id: { < Resume Token > },
    operationType: 'invalidate',
    clusterTime: <Timestamp>
    }
    For a connector with a subscribed collection, when operations like drop event, rename event, or dropDatabase event that affect the collection are performed, an invalidate event is generated.
    For a connector with a subscribed database, a dropDatabase event will generate an invalidate event.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support