Message Queues in Node.js
Message queues decouple producers from consumers, absorb traffic spikes, and enable retry logic without blocking HTTP responses. Node.js services commonly use Redis-backed BullMQ or RabbitMQ for production workloads.
When to Use a Queue
| Pattern | Example |
|---|---|
| Fire-and-forget | Send welcome email after signup |
| Background processing | Generate PDF invoice |
| Rate-limited work | Call third-party API with quota |
| Event broadcasting | Order placed → inventory + analytics + email |
| Scheduled jobs | Nightly report generation |
If the user does not need the result immediately, queue it.
Architecture
API Server ──publish──► Queue (Redis/RabbitMQ) ──consume──► Worker Process(es)
│ │
└── 202 Accepted └── Side effects
Separate API processes (stateless, fast) from worker processes (long-running, retryable).
BullMQ with Redis
npm install bullmq ioredis
Producer (API)
import { Queue } from 'bullmq';
const emailQueue = new Queue('email', {
connection: { host: 'localhost', port: 6379 },
});
app.post('/register', async (req, res) => {
const user = await createUser(req.body);
await emailQueue.add('welcome', { userId: user.id, email: user.email }, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
});
res.status(202).json({ id: user.id });
});
Consumer (Worker)
import { Worker } from 'bullmq';
import { sendWelcomeEmail } from './email-service';
const worker = new Worker('email', async (job) => {
if (job.name === 'welcome') {
await sendWelcomeEmail(job.data.email);
}
}, {
connection: { host: 'localhost', port: 6379 },
concurrency: 5,
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err.message);
});
Run workers as separate processes (PM2, Kubernetes Deployments) — never inside the HTTP server process in production.
Job Options and Reliability
await queue.add('process-payment', payload, {
jobId: `payment-${orderId}`, // idempotency key
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
delay: 5000, // schedule 5s later
priority: 1, // lower number = higher priority
});
Idempotent Consumers
Queues guarantee at-least-once delivery. Design handlers to be safe on retry:
async function processPayment(orderId: string, amount: number): Promise<void> {
const existing = await db.payment.findUnique({ where: { orderId } });
if (existing?.status === 'completed') {
return; // already processed
}
await chargeCard(orderId, amount);
await db.payment.upsert({
where: { orderId },
create: { orderId, status: 'completed', amount },
update: { status: 'completed' },
});
}
Dead Letter Queues (DLQ)
Failed jobs after max retries need inspection:
import { QueueEvents } from 'bullmq';
const events = new QueueEvents('email', { connection });
events.on('failed', async ({ jobId, failedReason }) => {
const job = await emailQueue.getJob(jobId);
if (job && job.attemptsMade >= job.opts.attempts!) {
await dlq.add('failed-email', {
original: job.data,
reason: failedReason,
});
}
});
Monitor DLQ depth — sustained growth signals a systemic bug or downstream outage.
RabbitMQ Alternative
Use RabbitMQ when you need complex routing (topic exchanges, fanout):
import amqp from 'amqplib';
const conn = await amqp.connect(process.env.AMQP_URL!);
const channel = await conn.createChannel();
await channel.assertQueue('orders', { durable: true });
// Publish (persistent)
channel.sendToQueue('orders', Buffer.from(JSON.stringify(order)), {
persistent: true,
});
// Consume (manual ack)
channel.consume('orders', async (msg) => {
if (!msg) return;
try {
await handleOrder(JSON.parse(msg.content.toString()));
channel.ack(msg);
} catch (err) {
channel.nack(msg, false, false); // send to DLX if configured
}
});
Configure dead-letter exchanges on the queue for poison messages.
Redis Streams (Lightweight)
For simpler pub/sub with consumer groups:
import { createClient } from 'redis';
const client = createClient();
await client.connect();
await client.xGroupCreate('events', 'processors', '0', { MKSTREAM: true });
while (true) {
const messages = await client.xReadGroup('processors', 'worker-1', [
{ key: 'events', id: '>' },
], { COUNT: 10, BLOCK: 5000 });
for (const msg of messages ?? []) {
await processEvent(msg);
await client.xAck('events', 'processors', msg.id);
}
}
Scheduled and Recurring Jobs
import { Queue } from 'bullmq';
const reports = new Queue('reports', { connection });
// Cron: every day at 2 AM
await reports.add('daily-summary', {}, {
repeat: { pattern: '0 2 * * *' },
});
BullMQ uses Redis for scheduling. For cluster-wide cron without duplication, ensure only one scheduler instance or use distributed locks.
Monitoring
| Metric | Why it matters |
|---|---|
| Queue depth | Backlog growing = workers underprovisioned |
| Processing time p95 | Slow jobs block concurrency slots |
| Failed job rate | Bugs or dependency failures |
| Retry count | Flaky downstream services |
Tools: Bull Board (UI), Prometheus exporters, Datadog integrations.
Production Checklist
- Workers run as separate deployments from API
- Redis/RabbitMQ persisted and replicated
- Idempotent job handlers with deduplication keys
- DLQ with alerting on threshold
- Graceful shutdown:
worker.close()on SIGTERM - Payload size limits — store blobs in S3, pass references
- Schema versioning for job payloads
Message queues turn synchronous bottlenecks into manageable background work — essential for resilient Node.js backends at scale.