tencent cloud

MongoDB Data Subscription
Last updated: 2024-09-09 21:38:37
MongoDB Data Subscription
Last updated: 2024-09-09 21:38:37

Introduction

The MongoDB Kafka Connector allows monitoring all databases or a single database within a MongoDB instance. It also allows monitoring all collections or a single collection within a database. The connector generates change event messages from Mongo modifications and submits them as a message flow to a Kafka topic. Client applications can consume the messages from the corresponding Kafka topic to process database change events, achieving the goal of monitoring specific databases.
This document summarizes and organizes information from the official MongoDB documentation. For details, see MongoDB Change Events.

Event Format

The following JSON framework illustrates the fields that may appear in all change event messages:
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection>"
},
"to" : {
"db" : "<database>",
"coll" : "<collection>"
},
"documentKey" : { "_id" : <value> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ],
"truncatedArrays" : [
{ "field" : <field>, "newSize" : <integer> },
...
]
},
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}

Some fields may only appear in certain event types. The table below describes the corresponding fields and their meanings.
Field
Type
Description
_id
document
A BSON object used to uniquely identify the event. The format of the _id object is as follows: { "_data" : <BinData|hex string>}. The type of _data depends on the version of MongoDB. For a complete description of the _data type, see Resume Tokens.
operationType
string
This field indicates the operation types that trigger the change events, including the following 8 types: insert, delete, replace, update, drop, rename, dropDatabase, and invalidate.
fullDocument
document
This field indicates the documents affected by the insert, replace, delete, and update operations. For insert and replace operations, this field indicates the new document. For delete operations, this field is omitted, indicating the document no longer exists. For update operations, this field is shown only if fullDocument is configured as updateLookup.
ns
document
Refers to the namespace, consisting of the database and collection.
ns.db
string
Refers to the database name.
ns.coll
string
Refers to the collection name. For dropDatabase operations, this field is omitted.
to
document
When the operation type is Rename, this field indicates the new collection name. This field is omitted for other operations.
to.db
string
Refers to the name of the new database.
to.coll
string
Refers to the new collection name.
documentKey
document
Refers to the ID of the document modified by the operation.
updateDescription
document
Refers to a document that describes the field modified by the update operation. This field is present only if the event corresponds to an update operation.
updateDescription.updatedFields
document
This field contains the fields modified by the update operation, with the value of the field being the updated value.
updateDescription.removedFields
array
This field contains the fields deleted by the update operation.
updateDescription.truncatedArrays
array
This field records the array truncation performed using one or more of the following pipeline-based updates:$addFields$set$replaceRoot$replaceWith
updateDescription.truncatedArrays.field
string
Indicates the field that was removed.
updateDescription.truncatedArrays.newSize
integer
Refers to the number of elements in the truncated array.
clusterTime
Timestamp
Refers to the oplog timestamp associated with the event. For events involving Multi-Document Transactions, the clusterTime values associated with the event are the same.
txnNumber
NumberLong
Refers to the transaction ID. It appears only when the operation is a Multi-Document Transaction.
lsid
Document
Refers to the session ID associated with the transaction. It appears only when the operation is a Multi-Document Transaction.

Event List

Insert Event

{
_id: { < Resume Token > },
operationType: 'insert',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
userName: 'alice123',
_id: ObjectId("599af247bb69cd8996xxxxxx")
},
fullDocument: {
_id: ObjectId("599af247bb69cd8996xxxxxx"),
userName: 'alice123',
name: 'Alice'
}
}
The documentKey field contains both _id and username fields, indicating that the engineering.users collection is sharded, with the shard key being username and _id.

Update Event

{
_id: { < Resume Token > },
operationType: 'update',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx")
},
updateDescription: {
updatedFields: {
email: 'alice@10gen.com'
},
removedFields: ['phoneNumber'],
truncatedArrays: [ {
"field" : "vacation_time",
"newSize" : 36
} ]
}
}
The following example shows the message content of an update event with the fullDocument : updateLookup option configured:
{
_id: { < Resume Token > },
operationType: 'update',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx")
},
updateDescription: {
updatedFields: {
email: 'alice@10gen.com'
},
removedFields: ['phoneNumber'],
truncatedArrays: [ {
"field" : "vacation_time",
"newSize" : 36
} ]
},
fullDocument: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx"),
name: 'Alice',
userName: 'alice123',
email: 'alice@10gen.com',
team: 'replication'
}
}

Replace Event

{
_id: { < Resume Token > },
operationType: 'replace',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("599af247bb69cd8996xxxxxx")
},
fullDocument: {
_id: ObjectId("599af247bb69cd8996xxxxxx"),
userName: 'alice123',
name: 'Alice'
}
}
The replace operation is performed in two steps:
Delete the original document corresponding to the documentKey.
Insert a new document with the same documentkey.
For a replace event, the fullDocument field represents the new document inserted.

Delete Event

{
_id: { < Resume Token > },
operationType: 'delete',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("599af247bb69cd8996xxxxxx")
}
}
For the delete event message, the fullDocument field is omitted.

Drop Event

{
_id: { < Resume Token > },
operationType: 'drop',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
}
}
When a collection is deleted, this event is triggered, and it causes the connector subscribing to that collection to generate an invalidate event.

Rename Event

{
_id: { < Resume Token > },
operationType: 'rename',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
to: {
db: 'engineering',
coll: 'people'
}
}
When a collection name is modified, this event is triggered, and it causes the connector subscribing to that collection to generate an invalidate event.

Drop Database Event

{
_id: { < Resume Token > },
operationType: 'dropDatabase',
clusterTime: <Timestamp>,
ns: {
db: 'engineering'
}
}
When a database is deleted, this event is triggered, and it causes the connector subscribing to that collection to generate an invalidate event.
Before a drop database event (dropDatabase) is generated, the system will generate a drop event for each collection in the database.

Invalidate Event

{
_id: { < Resume Token > },
operationType: 'invalidate',
clusterTime: <Timestamp>
}
For a connector with a subscribed collection, when operations like drop event, rename event, or dropDatabase event that affect the collection are performed, an invalidate event is generated.
For a connector with a subscribed database, a dropDatabase event will generate an invalidate event.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback