Change streams and multi-document transactions bring event-driven and ACID capabilities to MongoDB while preserving document flexibility. Both require a replica set or sharded cluster.

Change Streams Overview

Change streams watch collections, databases, or entire deployments for insert, update, delete, replace, and invalidate operations. They use the replication oplog internally.

  Collection change → Oplog → Change Stream → Application consumer
  

Use cases: cache invalidation, sync to Elasticsearch, real-time dashboards, audit trails, event sourcing.

Basic Change Stream

  // Watch a collection
const pipeline = [
  { $match: { operationType: { $in: ["insert", "update", "replace"] } } }
]

const changeStream = db.orders.watch(pipeline)

changeStream.forEach(change => {
  print(change.operationType, change.documentKey._id)
  if (change.fullDocument) {
    printjson(change.fullDocument)
  }
})
  

Change Event Structure

  {
  _id: { _data: "8263..." },           // resume token
  operationType: "update",
  clusterTime: Timestamp(...),
  ns: { db: "shop", coll: "orders" },
  documentKey: { _id: ObjectId("...") },
  updateDescription: {
    updatedFields: { status: "shipped" },
    removedFields: []
  },
  fullDocument: { ... }  // if fullDocument lookup enabled
}
  

Resume Tokens

Consumers must survive restarts without missing events:

  let resumeToken = loadFromStorage()  // persist to Redis, file, etc.

const stream = db.orders.watch([], {
  resumeAfter: resumeToken,
  fullDocument: "updateLookup"  // fetch full doc on updates
})

stream.on("change", change => {
  resumeToken = change._id
  saveToStorage(resumeToken)
  processChange(change)
})

stream.on("error", err => {
  if (err.code === 286) {  // history lost — oplog rolled
    // re-sync from last known state
  }
})
  

Resume tokens expire when oplog entries are overwritten — size oplog appropriately:

  replication:
  oplogSizeMB: 10240  # 10 GB for high-change workloads
  

Database and Deployment-Level Streams

  // All collections in a database
db.watch([{ $match: { operationType: "insert" } }])

// Entire deployment (admin privilege required)
const adminDb = db.getSiblingDB("admin")
adminDb.watch()
  

Node.js Change Stream Example

  const { MongoClient } = require('mongodb');

async function watchOrders() {
  const client = new MongoClient(uri);
  await client.connect();
  const collection = client.db('shop').collection('orders');

  const changeStream = collection.watch(
    [{ $match: { 'fullDocument.status': 'completed' } }],
    { fullDocument: 'updateLookup' }
  );

  for await (const change of changeStream) {
    await syncToElasticsearch(change.fullDocument);
  }
}
  

Handle backpressure — if processing is slower than write rate, events queue in the oplog window.

Multi-Document Transactions

Transactions provide ACID guarantees across multiple documents and collections:

  const session = db.getMongo().startSession();

session.startTransaction({
  readConcern: { level: "snapshot" },
  writeConcern: { w: "majority" }
});

try {
  const shop = session.getDatabase("shop");
  const orders = shop.orders;
  const inventory = shop.inventory;

  orders.insertOne({
    userId: ObjectId("..."),
    items: [{ sku: "MOUSE-01", qty: 1, price: NumberDecimal("29.99") }],
    total: NumberDecimal("29.99"),
    status: "confirmed"
  }, { session });

  const result = inventory.updateOne(
    { sku: "MOUSE-01", stock: { $gte: 1 } },
    { $inc: { stock: -1 } },
    { session }
  );

  if (result.modifiedCount === 0) {
    throw new Error("Insufficient stock");
  }

  session.commitTransaction();
} catch (error) {
  session.abortTransaction();
  throw error;
} finally {
  session.endSession();
}
  

Transaction Options

Option Values Recommendation
readConcern local, majority, snapshot snapshot for consistent reads
writeConcern w: 1, w: "majority" majority for durability
readPreference primary Always primary for transactions
maxCommitTimeMS milliseconds Set for long operations

Transaction Patterns

Transfer Between Accounts

  session.startTransaction();
try {
  db.accounts.updateOne({ _id: fromId }, { $inc: { balance: -amount } }, { session });
  db.accounts.updateOne({ _id: toId }, { $inc: { balance: amount } }, { session });
  db.transactions.insertOne({ from: fromId, to: toId, amount }, { session });
  session.commitTransaction();
} catch (e) {
  session.abortTransaction();
}
  

Idempotent Transaction with Retry

  async function withRetry(txnFn, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    const session = client.startSession();
    try {
      session.startTransaction();
      await txnFn(session);
      await session.commitTransaction();
      return;
    } catch (err) {
      await session.abortTransaction();
      if (err.hasErrorLabel('TransientTransactionError')) continue;
      throw err;
    } finally {
      session.endSession();
    }
  }
}
  

Retry on TransientTransactionError and UnknownTransactionCommitResult.

Use Case Matrix

Feature Use Case Alternative
Change streams Cache invalidation TTL + polling
Change streams Sync to search index $out batch ETL
Change streams Real-time notifications Message queue (Kafka)
Transactions Order + inventory Single-document atomic update
Transactions Financial transfers Required — no alternative
Transactions Multi-collection consistency Embed in one document

Limitations

Change Streams

  • Require replica set or sharded cluster
  • Do not capture DDL (create/drop collection) by default in all versions
  • Oplog window limits resume capability
  • High-volume streams need consumer scaling (multiple workers with partition logic)

Transactions

  • 60-second default timeout — keep transactions short
  • 16 MB document size limit per operation
  • Performance overhead vs single-document operations
  • Cross-shard transactions have higher latency
  • Cannot perform cross-database transactions in all configurations

When NOT to Use Transactions

Prefer single-document atomicity when possible:

  // Atomic — no transaction needed
db.inventory.updateOne(
  { sku: "MOUSE-01", stock: { $gte: 1 } },
  { $inc: { stock: -1 } }
)
  

MongoDB guarantees atomicity at the document level — use transactions only when multiple documents must be consistent.

Common Mistakes

  • Long-running transactions holding locks — keep under 1 second
  • Not handling TransientTransactionError with retry logic
  • Change stream consumer without resume token persistence
  • Using transactions for every write — unnecessary overhead
  • Not sizing oplog for change stream retention requirements
  • Watching high-volume collections without $match filtering

Troubleshooting

Transaction aborted: WriteConflict

Concurrent update to same document — retry with exponential backoff.

Change stream error 286 (ChangeStreamHistoryLost)

Oplog entry expired — consumer must do full re-sync:

  const stream = db.orders.watch([], { startAtOperationTime: lastKnownTime })
  

Transaction timeout

  session.startTransaction({ maxCommitTimeMS: 30000 })
// Break large operations into smaller transactions
  

Production Scenario: Order Processing

  1. Order placed → insertOne (single doc, atomic)
2. Change stream detects insert → trigger fulfillment workflow
3. Fulfillment → transaction: update order status + decrement inventory + create shipment
4. Change stream on shipment → notify customer via push notification
  

Combine change streams (async events) with transactions (consistent state changes).

Best Practices

  1. Keep transactions short — under 1 second ideal
  2. Persist change stream resume tokens durably
  3. Filter change streams with $match to reduce noise
  4. Size oplog for change stream recovery window (24+ hours)
  5. Use single-document operations when they suffice
  6. Test failover behavior — transactions abort on primary step-down

What Comes Next

Expert deep dives cover MongoDB Atlas cloud management, advanced sharding, and production operations.