Change Streams and Transactions
Change streams and multi-document transactions bring event-driven and ACID capabilities to MongoDB while preserving document flexibility. Both require a replica set or sharded cluster.
Change Streams Overview
Change streams watch collections, databases, or entire deployments for insert, update, delete, replace, and invalidate operations. They use the replication oplog internally.
Collection change → Oplog → Change Stream → Application consumer
Use cases: cache invalidation, sync to Elasticsearch, real-time dashboards, audit trails, event sourcing.
Basic Change Stream
// Watch a collection
const pipeline = [
{ $match: { operationType: { $in: ["insert", "update", "replace"] } } }
]
const changeStream = db.orders.watch(pipeline)
changeStream.forEach(change => {
print(change.operationType, change.documentKey._id)
if (change.fullDocument) {
printjson(change.fullDocument)
}
})
Change Event Structure
{
_id: { _data: "8263..." }, // resume token
operationType: "update",
clusterTime: Timestamp(...),
ns: { db: "shop", coll: "orders" },
documentKey: { _id: ObjectId("...") },
updateDescription: {
updatedFields: { status: "shipped" },
removedFields: []
},
fullDocument: { ... } // if fullDocument lookup enabled
}
Resume Tokens
Consumers must survive restarts without missing events:
let resumeToken = loadFromStorage() // persist to Redis, file, etc.
const stream = db.orders.watch([], {
resumeAfter: resumeToken,
fullDocument: "updateLookup" // fetch full doc on updates
})
stream.on("change", change => {
resumeToken = change._id
saveToStorage(resumeToken)
processChange(change)
})
stream.on("error", err => {
if (err.code === 286) { // history lost — oplog rolled
// re-sync from last known state
}
})
Resume tokens expire when oplog entries are overwritten — size oplog appropriately:
replication:
oplogSizeMB: 10240 # 10 GB for high-change workloads
Database and Deployment-Level Streams
// All collections in a database
db.watch([{ $match: { operationType: "insert" } }])
// Entire deployment (admin privilege required)
const adminDb = db.getSiblingDB("admin")
adminDb.watch()
Node.js Change Stream Example
const { MongoClient } = require('mongodb');
async function watchOrders() {
const client = new MongoClient(uri);
await client.connect();
const collection = client.db('shop').collection('orders');
const changeStream = collection.watch(
[{ $match: { 'fullDocument.status': 'completed' } }],
{ fullDocument: 'updateLookup' }
);
for await (const change of changeStream) {
await syncToElasticsearch(change.fullDocument);
}
}
Handle backpressure — if processing is slower than write rate, events queue in the oplog window.
Multi-Document Transactions
Transactions provide ACID guarantees across multiple documents and collections:
const session = db.getMongo().startSession();
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" }
});
try {
const shop = session.getDatabase("shop");
const orders = shop.orders;
const inventory = shop.inventory;
orders.insertOne({
userId: ObjectId("..."),
items: [{ sku: "MOUSE-01", qty: 1, price: NumberDecimal("29.99") }],
total: NumberDecimal("29.99"),
status: "confirmed"
}, { session });
const result = inventory.updateOne(
{ sku: "MOUSE-01", stock: { $gte: 1 } },
{ $inc: { stock: -1 } },
{ session }
);
if (result.modifiedCount === 0) {
throw new Error("Insufficient stock");
}
session.commitTransaction();
} catch (error) {
session.abortTransaction();
throw error;
} finally {
session.endSession();
}
Transaction Options
| Option | Values | Recommendation |
|---|---|---|
readConcern |
local, majority, snapshot |
snapshot for consistent reads |
writeConcern |
w: 1, w: "majority" |
majority for durability |
readPreference |
primary |
Always primary for transactions |
maxCommitTimeMS |
milliseconds | Set for long operations |
Transaction Patterns
Transfer Between Accounts
session.startTransaction();
try {
db.accounts.updateOne({ _id: fromId }, { $inc: { balance: -amount } }, { session });
db.accounts.updateOne({ _id: toId }, { $inc: { balance: amount } }, { session });
db.transactions.insertOne({ from: fromId, to: toId, amount }, { session });
session.commitTransaction();
} catch (e) {
session.abortTransaction();
}
Idempotent Transaction with Retry
async function withRetry(txnFn, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
const session = client.startSession();
try {
session.startTransaction();
await txnFn(session);
await session.commitTransaction();
return;
} catch (err) {
await session.abortTransaction();
if (err.hasErrorLabel('TransientTransactionError')) continue;
throw err;
} finally {
session.endSession();
}
}
}
Retry on TransientTransactionError and UnknownTransactionCommitResult.
Use Case Matrix
| Feature | Use Case | Alternative |
|---|---|---|
| Change streams | Cache invalidation | TTL + polling |
| Change streams | Sync to search index | $out batch ETL |
| Change streams | Real-time notifications | Message queue (Kafka) |
| Transactions | Order + inventory | Single-document atomic update |
| Transactions | Financial transfers | Required — no alternative |
| Transactions | Multi-collection consistency | Embed in one document |
Limitations
Change Streams
- Require replica set or sharded cluster
- Do not capture DDL (create/drop collection) by default in all versions
- Oplog window limits resume capability
- High-volume streams need consumer scaling (multiple workers with partition logic)
Transactions
- 60-second default timeout — keep transactions short
- 16 MB document size limit per operation
- Performance overhead vs single-document operations
- Cross-shard transactions have higher latency
- Cannot perform cross-database transactions in all configurations
When NOT to Use Transactions
Prefer single-document atomicity when possible:
// Atomic — no transaction needed
db.inventory.updateOne(
{ sku: "MOUSE-01", stock: { $gte: 1 } },
{ $inc: { stock: -1 } }
)
MongoDB guarantees atomicity at the document level — use transactions only when multiple documents must be consistent.
Common Mistakes
- Long-running transactions holding locks — keep under 1 second
- Not handling
TransientTransactionErrorwith retry logic - Change stream consumer without resume token persistence
- Using transactions for every write — unnecessary overhead
- Not sizing oplog for change stream retention requirements
- Watching high-volume collections without
$matchfiltering
Troubleshooting
Transaction aborted: WriteConflict
Concurrent update to same document — retry with exponential backoff.
Change stream error 286 (ChangeStreamHistoryLost)
Oplog entry expired — consumer must do full re-sync:
const stream = db.orders.watch([], { startAtOperationTime: lastKnownTime })
Transaction timeout
session.startTransaction({ maxCommitTimeMS: 30000 })
// Break large operations into smaller transactions
Production Scenario: Order Processing
1. Order placed → insertOne (single doc, atomic)
2. Change stream detects insert → trigger fulfillment workflow
3. Fulfillment → transaction: update order status + decrement inventory + create shipment
4. Change stream on shipment → notify customer via push notification
Combine change streams (async events) with transactions (consistent state changes).
Best Practices
- Keep transactions short — under 1 second ideal
- Persist change stream resume tokens durably
- Filter change streams with
$matchto reduce noise - Size oplog for change stream recovery window (24+ hours)
- Use single-document operations when they suffice
- Test failover behavior — transactions abort on primary step-down
What Comes Next
Expert deep dives cover MongoDB Atlas cloud management, advanced sharding, and production operations.