MongoDB has supported the change stream capability since version 3.6 (many enhancements have been made in versions 4.0 and 4.2) for subscribing to modifications within MongoDB. change stream can be used for incremental data migration, synchronization between MongoDB and for applying MongoDB's incremental subscription to other related systems; for example, in e-commerce scenarios, new orders are stored in MongoDBInformation, the business needs to inform the inventory management system of the shipment based on the new order information.
Change Stream versus Tailing Oplog
Before the change stream function, if you want to get incremental modifications to MongoDB, you can tailing oplog continuously Pull incremental oplog And then filter the oplogs that meet the criteria for the pulled oplog collection.This approach can also meet the needs of most scenarios, but there are some shortcomings.
- With a higher threshold, users need to open tailable cursor ("tailable": true, "awaitData": true) for the oplog collection with special options.
- Users need to manage incremental continuation themselves. When crash is pulled, users need to record a field such as ts and h that pulls the oplog before continuing to pull it the next time they locate the specified oplog.
- Result filtering must be done on the pull side, but when you only need to subscribe to some oplogs, such as for a DB, a Collection, or some type of operation, you must pull around the oplogs before filtering.
- For update operations, oplog only contains part of the operation, such as {$set: {x: 1}}, and applications often need to get the full document content.
- Shared Cluster subscriptions are not supported. Users must tailing oplog for each shard and there is no moveChunk operation in the process, otherwise the results may be out of order.
MongoDB Change Stream solves the shortage of Tailing oplog
- Easy to use, provides a unified Change Stream API that retrieves incremental modifications from the MongoDB Server side with one API call.
- Unified progress management identifies the pull location by resume token, which can be subscribed from the last location simply by resume token with the last result on the API call.
- Supports pipeline filtering of results on the Server side, reduces network transmission, and supports result filtering on dimensions such as DB, Collection, OperationType.
- FulDocument support:'updateLookup'option, for update, returns the complete content of the corresponding document at that time.
- Modified subscriptions to Shared Cluster are supported, and the same API request is sent to mongos to get globally ordered modifications to the cluster dimensions.
Change Stream Actual
In the case of mongo shell, using Change Stream is very simple, and the mongo shell encapsulates subscription operations at the instance, DB, Collection levels.
db.getMongo().watch() subscribes to modifications for the entire instance db.watch() subscription specifies modifications to DB Subscription to db.collection.watch() specifies modifications to Collection
- New Connection 1 Initiates Subscription
Mytest: PRIMARY > db.coll.watch ([], {maxAwaitTimeMS: 60000}) Blocked for up to 1 minute
- New Connection 2 Writes new data
mytest:PRIMARY> db.coll.insert({x: 100}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 101}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 102}) WriteResult({ "nInserted" : 1 })
- Change Stream Update Received on Connection 1
mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000}) { "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } } { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }
- In the ChangeStream result above, the content of the _id field is resume token, which identifies a location of the oplog. If you want to continue subscribing from a location, you can specify it by resumeAfter when watch ing.For example, each app has subscribed to the three changes mentioned above, but only the first one has been successfully consumed. Specify resume token of the first one at the next subscription to subscribe to the next two again.
mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }}) { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }
Change Stream internal implementation
watch() wrapper
db.watch() is actually an API wrapper. In fact, Change Stream is actually an aggregation command inside MongoDB, with the addition of a special $change stream phase, after the change stream subscription operation is initiated, you can see the detailed parameters of the corresponding aggregation/getMore operation through db.currentOp().
{ "op" : "getmore", "ns" : "test.coll", "command" : { "getMore" : NumberLong("233479991942333714"), "collection" : "coll", "maxTimeMS" : 50000, "lsid" : { "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107") }, }, "planSummary" : "COLLSCAN", "cursor" : { "cursorId" : NumberLong("233479991942333714"), "createdDate" : ISODate("2019-12-31T06:35:52.479Z"), "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"), "nDocsReturned" : NumberLong(1), "nBatchesReturned" : NumberLong(1), "noCursorTimeout" : false, "tailable" : true, "awaitData" : true, "originatingCommand" : { "aggregate" : "coll", "pipeline" : [ { "$changeStream" : { "fullDocument" : "default" } } ], "cursor" : { }, "lsid" : { "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107") }, "$clusterTime" : { "clusterTime" : Timestamp(1577774144, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } }, "$db" : "test" }, "operationUsingCursorId" : NumberLong(7019500) }, "numYields" : 2, "locks" : { } }
resume token
Resume token describes a subscription point and is essentially an encapsulation of oplog information, including clusterTime, uuid, documentKey, and so on. When resume token is brought to the subscription API, MongoDB Server converts it to the corresponding information and locates it at the starting point of the oplog to continue the subscription operation.
struct ResumeTokenData { Timestamp clusterTime; int version = 0; size_t applyOpsIndex = 0; Value documentKey; boost::optional<UUID> uuid; };
The Resume TokenData structure contains version information, which was 0 before version 4.0.7; 4.0.7 introduces a new resume token format, version 1; and in version 3.6, the encoding of Resume Token is different from 4.0; so it is possible that different versions of token will not recognize the problem after version upgrade, so try to make MongoDBAll components of Server (Replica Set members, ConfigServer, Mongos) maintain the same kernel version.
More detailed information, reference https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability
updateLookup
Change Stream supports update operations to get the full contents of the current document, rather than just updating the operation itself, such as
mytest:PRIMARY> db.coll.find({_id: 101}) { "_id" : 101, "name" : "jack", "age" : 18 } mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
By default, the change stream receives the contents of {_id: 101}, {$set: {age: 20} and does not contain information about other fields that have not been updated for this document; with the FulDocument:'updateLookup'option, the Change Stream looks up the current content of the document based on the document_id and returns it.
It is important to note that the updateLookup option only guarantees final consistency. For example, for the above documents, if updates are made 100 times in a row, update's change stream does not receive every intermediate update in sequence, because each time it looks for the current content of the document, which may be overwritten by subsequent modifications.
Sharded cluster
Change Stream supports subscriptions to sharded cluster s, guaranteeing globally ordered returns; to achieve this goal, mongos needs to return subscription results from each shard sorted and merged by time stamp.
In extreme cases, if some shard writes very little or no, the return delay of the change stream is affected because you need to wait until all shards return the subscription results; by default, mongod server generates a special oplog for Noop every 10s, which indirectly drives the sharded cluster to continue running even when writing is lowGo.
Because of the need for global sorting, the performance of Change Stream is likely to fail when sharded cluster write volumes are high; if performance requirements are high, consider turning off Balancer and creating a Change Stream on each shard individually.