Message Queues: RabbitMQ vs Redis vs SQS
TL;DR
Use RabbitMQ for complex routing and guaranteed delivery. Use Redis for simple, fast queues. Use SQS for managed, scalable queues on AWS. All handle async tasks - choose based on complexity, scale, and infrastructure.
Our API was timing out. Image processing took 30 seconds, but requests timed out at 10. Users uploaded photos and got 504 errors even though processing succeeded.
We added Redis as a message queue. API returns immediately, background worker processes images. Response time: 30 seconds → 200ms. Users happy. Problem solved.
Six months later, we had a different problem. Lost jobs during deploys. Redis queue cleared when restarting. Critical payment webhooks disappeared. We switched to RabbitMQ with persistence and acknowledgments. Zero lost jobs since.
Message queues solve different problems. Here's how to choose between RabbitMQ, Redis, and SQS, with real examples and production lessons.
What Are Message Queues?
Message queues enable asynchronous processing - send work to be done later:
// Without queue (blocking)
app.post('/upload', async (req, res) => {
const image = req.file;
// This takes 30 seconds - request times out!
await processImage(image);
await generateThumbnails(image);
await runFaceDetection(image);
res.json({ success: true });
});
// With queue (non-blocking)
app.post('/upload', async (req, res) => {
const image = req.file;
// Add job to queue - instant
await queue.add('process-image', { imageId: image.id });
res.json({ success: true }); // Returns in 200ms
});
// Worker processes jobs in background
queue.process('process-image', async (job) => {
const { imageId } = job.data;
await processImage(imageId);
await generateThumbnails(imageId);
await runFaceDetection(imageId);
});
Benefits:
- Fast API responses (offload slow work)
- Reliability (retry failed jobs)
- Scalability (add more workers)
- Decoupling (API and workers independent)
- Load leveling (handle traffic spikes)
When to Use Message Queues
1. Long-Running Tasks
// Jobs that take too long for HTTP requests
queue.add('generate-report', { userId, month });
queue.add('export-data', { format: 'csv', filters });
queue.add('send-email', { to, subject, body });
queue.add('process-video', { videoId });
2. Background Processing
// Tasks that don't need immediate results
queue.add('update-search-index', { documentId });
queue.add('cleanup-old-files', { olderThan: '30d' });
queue.add('calculate-analytics', { date: today });
3. Microservices Communication
// Service A publishes event
await eventBus.publish('user.created', { userId, email });
// Service B subscribes
eventBus.subscribe('user.created', async (data) => {
await sendWelcomeEmail(data.email);
});
// Service C also subscribes
eventBus.subscribe('user.created', async (data) => {
await createAnalyticsProfile(data.userId);
});
4. Rate Limiting External APIs
// Prevent hitting rate limits
queue.add('call-external-api', { endpoint, data }, {
rateLimiter: {
max: 10, // 10 jobs
duration: 1000 // per second
}
});
5. Scheduled Tasks
// Cron-like jobs
queue.add('send-daily-digest', {}, {
repeat: { cron: '0 9 * * *' } // Every day at 9am
});
RabbitMQ: The Full-Featured Solution
Best for: Complex routing, guaranteed delivery, enterprise systems
Strengths
- Reliable: Persistent queues, acknowledgments, guaranteed delivery
- Flexible routing: Exchanges, bindings, routing keys
- Multiple protocols: AMQP, MQTT, STOMP
- Message priority: High-priority jobs first
- Dead letter queues: Handle failed messages
- Clustering: High availability
Setup
# Install RabbitMQ
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
Basic Queue (Node.js)
const amqp = require('amqplib');
// Producer
async function sendJob(queueName, data) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queueName, { durable: true });
channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(data)),
{ persistent: true } // Survive RabbitMQ restart
);
console.log('Sent:', data);
await channel.close();
await connection.close();
}
// Consumer
async function processJobs(queueName) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queueName, { durable: true });
channel.prefetch(1); // Process one at a time
channel.consume(queueName, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Processing:', data);
try {
await processImage(data);
channel.ack(msg); // Acknowledge success
} catch (err) {
console.error('Failed:', err);
channel.nack(msg, false, true); // Requeue on failure
}
});
}
// Usage
await sendJob('image-processing', { imageId: 123 });
await processJobs('image-processing');
Pub/Sub Pattern
// Publisher
async function publishEvent(exchange, routingKey, data) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange(exchange, 'topic', { durable: true });
channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(data))
);
console.log('Published:', routingKey, data);
}
// Subscriber
async function subscribe(exchange, pattern, handler) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange(exchange, 'topic', { durable: true });
const q = await channel.assertQueue('', { exclusive: true });
channel.bindQueue(q.queue, exchange, pattern);
channel.consume(q.queue, (msg) => {
const data = JSON.parse(msg.content.toString());
handler(msg.fields.routingKey, data);
}, { noAck: true });
}
// Usage
await publishEvent('events', 'user.created', { userId: 123 });
await publishEvent('events', 'user.deleted', { userId: 456 });
// Subscribe to all user events
await subscribe('events', 'user.*', (key, data) => {
console.log('Event:', key, data);
});
// Subscribe only to user.created
await subscribe('events', 'user.created', (key, data) => {
console.log('User created:', data);
});
Priority Queue
// Queue with priorities (0-10)
await channel.assertQueue('tasks', {
durable: true,
maxPriority: 10
});
// Send high-priority job
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(data)), {
persistent: true,
priority: 9 // High priority
});
// Send low-priority job
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(data)), {
persistent: true,
priority: 1 // Low priority
});
// High-priority jobs processed first
Dead Letter Queue
// Main queue with DLQ
await channel.assertQueue('main-queue', {
durable: true,
deadLetterExchange: 'dlx',
deadLetterRoutingKey: 'failed'
});
// Dead letter queue
await channel.assertExchange('dlx', 'direct');
await channel.assertQueue('failed-queue', { durable: true });
await channel.bindQueue('failed-queue', 'dlx', 'failed');
// Failed messages go to failed-queue after 3 retries
Redis: The Fast and Simple Option
Best for: Simple queues, high throughput, already using Redis
Strengths
- Fast: In-memory, microsecond latency
- Simple: Easy to set up and use
- Versatile: Lists, Pub/Sub, Streams
- Already there: If you use Redis for caching
- Lightweight: No separate infrastructure
Limitations
- Not durable: Data in memory (can enable persistence)
- No guaranteed delivery: Can lose messages on crash
- Simple routing: No complex routing like RabbitMQ
Redis Lists (Simple Queue)
const Redis = require('ioredis');
const redis = new Redis();
// Producer
async function addJob(queue, data) {
await redis.lpush(queue, JSON.stringify(data));
console.log('Added job:', data);
}
// Consumer
async function processJobs(queue) {
while (true) {
// BRPOP blocks until job available
const result = await redis.brpop(queue, 0);
const data = JSON.parse(result[1]);
console.log('Processing:', data);
await processImage(data);
}
}
// Usage
await addJob('image-queue', { imageId: 123 });
await processJobs('image-queue');
Redis with Bull (Recommended)
const Bull = require('bull');
// Create queue
const imageQueue = new Bull('image-processing', {
redis: { host: 'localhost', port: 6379 }
});
// Producer
await imageQueue.add({
imageId: 123,
userId: 456
}, {
attempts: 3, // Retry 3 times
backoff: {
type: 'exponential',
delay: 2000
}
});
// Consumer
imageQueue.process(async (job) => {
console.log('Processing:', job.data);
await processImage(job.data.imageId);
return { success: true };
});
// Event listeners
imageQueue.on('completed', (job, result) => {
console.log('Job completed:', job.id, result);
});
imageQueue.on('failed', (job, err) => {
console.error('Job failed:', job.id, err);
});
// Scheduled jobs
await imageQueue.add({ imageId: 789 }, {
delay: 60000 // Process in 60 seconds
});
// Recurring jobs
await imageQueue.add({ task: 'cleanup' }, {
repeat: { cron: '0 * * * *' } // Every hour
});
Redis Pub/Sub
const Redis = require('ioredis');
const pub = new Redis();
const sub = new Redis();
// Subscribe
sub.subscribe('user-events', (err, count) => {
console.log('Subscribed to', count, 'channels');
});
sub.on('message', (channel, message) => {
const data = JSON.parse(message);
console.log('Received:', channel, data);
});
// Publish
await pub.publish('user-events', JSON.stringify({
type: 'user.created',
userId: 123
}));
Limitation: Pub/Sub doesn't persist messages - if subscriber is down, messages are lost.
Redis Streams (Better Pub/Sub)
const Redis = require('ioredis');
const redis = new Redis();
// Producer
await redis.xadd(
'events',
'*', // Auto-generate ID
'type', 'user.created',
'userId', '123'
);
// Consumer group (persists position)
await redis.xgroup('CREATE', 'events', 'processors', '0', 'MKSTREAM');
// Consumer
while (true) {
const results = await redis.xreadgroup(
'GROUP', 'processors', 'worker-1',
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', 'events', '>'
);
if (results) {
const [stream, messages] = results[0];
for (const [id, fields] of messages) {
console.log('Processing:', id, fields);
await processEvent(fields);
// Acknowledge
await redis.xack('events', 'processors', id);
}
}
}
Streams are better: Messages persist, multiple consumers, acknowledgments.
AWS SQS: The Managed Solution
Best for: AWS infrastructure, scalability, no ops overhead
Strengths
- Fully managed: No servers to maintain
- Scalable: Handles millions of messages
- Reliable: Distributed, redundant
- Simple: Easy API
- Cost-effective: Pay per request
- Integrated: Works with Lambda, ECS, etc.
Limitations
- Latency: Higher than Redis/RabbitMQ (network)
- Vendor lock-in: AWS only
- Limited routing: Simple queues only (use SNS for pub/sub)
- Cost: Can be expensive at scale
Standard Queue
const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');
const sqs = new SQSClient({ region: 'us-east-1' });
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/image-queue';
// Send message
async function sendMessage(data) {
await sqs.send(new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(data)
}));
console.log('Sent:', data);
}
// Receive and process
async function processMessages() {
while (true) {
const response = await sqs.send(new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20 // Long polling
}));
if (response.Messages) {
for (const message of response.Messages) {
const data = JSON.parse(message.Body);
console.log('Processing:', data);
try {
await processImage(data);
// Delete after successful processing
await sqs.send(new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}));
} catch (err) {
console.error('Failed:', err);
// Message will return to queue after visibility timeout
}
}
}
}
}
// Usage
await sendMessage({ imageId: 123 });
await processMessages();
FIFO Queue (Order Guaranteed)
// Create FIFO queue (ends with .fifo)
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/orders.fifo';
// Send with deduplication
await sqs.send(new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(data),
MessageGroupId: 'order-123', // Messages with same ID are ordered
MessageDeduplicationId: 'unique-id' // Prevents duplicates within 5 minutes
}));
Dead Letter Queue
// Create main queue with DLQ
// In AWS Console or CloudFormation:
{
"QueueName": "main-queue",
"RedrivePolicy": {
"deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789:failed-queue",
"maxReceiveCount": 3 // After 3 failed attempts, send to DLQ
}
}
SNS + SQS (Pub/Sub)
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');
// Publish to SNS topic
const sns = new SNSClient({ region: 'us-east-1' });
await sns.send(new PublishCommand({
TopicArn: 'arn:aws:sns:us-east-1:123456789:user-events',
Message: JSON.stringify({ type: 'user.created', userId: 123 })
}));
// Multiple SQS queues subscribe to SNS topic
// Each gets a copy of the message
Comparison Table
| Feature | RabbitMQ | Redis | AWS SQS |
|---|---|---|---|
| Speed | Fast (10k-50k msg/s) | Fastest (100k+ msg/s) | Moderate (network latency) |
| Reliability | Excellent (persistent, ack) | Good (can lose messages) | Excellent (managed, redundant) |
| Durability | Yes (disk persistence) | Optional (mostly in-memory) | Yes (distributed storage) |
| Ordering | Yes (per queue) | Yes (lists/streams) | FIFO queues only |
| Routing | Complex (exchanges, bindings) | Simple | Simple (SNS for pub/sub) |
| Scaling | Clustering required | Single instance/cluster | Automatic |
| Operations | Self-hosted (complex) | Self-hosted (simple) | Fully managed (zero ops) |
| Cost | Server costs | Server costs | Pay per request |
| Best for | Complex routing, guarantees | Speed, simplicity | AWS infrastructure, scale |
When to Use Each
Use RabbitMQ When:
✓ Need guaranteed delivery
✓ Complex routing requirements
✓ Multiple consumers with different patterns
✓ Enterprise/critical systems
✓ Need message priorities
✓ Already have RabbitMQ expertise
✓ On-premise infrastructure
Examples:
- Payment processing
- Order fulfillment
- Critical notifications
- Multi-tenant systems
- Microservices with complex routing
Use Redis When:
✓ Already using Redis for caching
✓ Need very high throughput
✓ Simple queue requirements
✓ Low latency critical
✓ Temporary/non-critical data
✓ Want simple setup
Examples:
- Session storage + job queue
- Real-time analytics
- Rate limiting
- Simple background tasks
- Cache invalidation
Use AWS SQS When:
✓ On AWS infrastructure
✓ Want zero operations overhead
✓ Need auto-scaling
✓ Don't need sub-second latency
✓ Integrating with Lambda/ECS
✓ Want cost-effective at scale
Examples:
- Serverless architectures
- Event-driven systems on AWS
- Image/video processing
- Data pipeline triggers
- Microservices on ECS/EKS
Real-World Example: Image Processing
RabbitMQ Approach
// API: Upload image
app.post('/upload', async (req, res) => {
const image = await saveImage(req.file);
// Send to RabbitMQ with priority
await channel.sendToQueue('images', Buffer.from(JSON.stringify({
imageId: image.id,
type: 'thumbnail'
})), {
persistent: true,
priority: 5
});
res.json({ imageId: image.id, status: 'processing' });
});
// Worker: Process images
queue.process('images', 10, async (msg) => {
const { imageId, type } = JSON.parse(msg.content);
try {
if (type === 'thumbnail') {
await generateThumbnail(imageId);
}
channel.ack(msg);
} catch (err) {
channel.nack(msg, false, false); // Send to DLQ
}
});
// Scale: Run 10 workers on different servers
// All consume from same queue
Redis/Bull Approach
const imageQueue = new Bull('images');
// API: Upload image
app.post('/upload', async (req, res) => {
const image = await saveImage(req.file);
await imageQueue.add({
imageId: image.id,
type: 'thumbnail'
}, {
attempts: 3,
backoff: 5000
});
res.json({ imageId: image.id, status: 'processing' });
});
// Worker: Process images
imageQueue.process(10, async (job) => {
const { imageId, type } = job.data;
if (type === 'thumbnail') {
await generateThumbnail(imageId);
}
return { success: true };
});
// Progress tracking
imageQueue.process(async (job) => {
await job.progress(0);
await processStep1(job.data);
await job.progress(50);
await processStep2(job.data);
await job.progress(100);
});
// API: Check status
app.get('/status/:jobId', async (req, res) => {
const job = await imageQueue.getJob(req.params.jobId);
res.json({
status: await job.getState(),
progress: job.progress()
});
});
AWS SQS Approach
// API: Upload image
app.post('/upload', async (req, res) => {
const image = await saveImage(req.file);
await sqs.send(new SendMessageCommand({
QueueUrl: process.env.IMAGE_QUEUE_URL,
MessageBody: JSON.stringify({
imageId: image.id,
type: 'thumbnail'
})
}));
res.json({ imageId: image.id, status: 'processing' });
});
// Lambda worker (triggered by SQS)
exports.handler = async (event) => {
for (const record of event.Records) {
const { imageId, type } = JSON.parse(record.body);
if (type === 'thumbnail') {
await generateThumbnail(imageId);
}
}
};
// Scale: Lambda auto-scales with queue depth
// No worker management needed
Migration Path
From Synchronous to Async
// Phase 1: Synchronous (current)
app.post('/process', async (req, res) => {
const result = await heavyOperation(req.body);
res.json(result);
});
// Phase 2: Add queue (dual mode)
app.post('/process', async (req, res) => {
if (req.query.async === 'true') {
// New: Async mode
const jobId = await queue.add('process', req.body);
res.json({ jobId, status: 'processing' });
} else {
// Old: Sync mode (keep for backwards compatibility)
const result = await heavyOperation(req.body);
res.json(result);
}
});
// Phase 3: Default to async
app.post('/process', async (req, res) => {
const jobId = await queue.add('process', req.body);
res.json({ jobId, status: 'processing' });
});
// Phase 4: Remove sync mode entirely
From Redis to RabbitMQ
// Wrapper to abstract queue implementation
class QueueAdapter {
async add(queue, data) {
if (USE_RABBITMQ) {
await this.rabbitmq.sendToQueue(queue, Buffer.from(JSON.stringify(data)));
} else {
await this.redis.lpush(queue, JSON.stringify(data));
}
}
}
// Gradually migrate queues
// Week 1: Non-critical queues
// Week 2: Most queues
// Week 3: Critical queues
// Week 4: Remove Redis queue code
Common Mistakes
Mistake 1: Not Handling Failures
// BAD - Job fails silently
queue.process(async (job) => {
await processImage(job.data.imageId);
// If this throws, job is lost
});
// GOOD - Retry with backoff
queue.process(async (job) => {
try {
await processImage(job.data.imageId);
} catch (err) {
if (job.attemptsMade < 3) {
throw err; // Retry
} else {
await logFailure(job, err);
// Don't throw - job goes to completed/failed
}
}
});
Mistake 2: Not Setting Timeouts
// BAD - Job runs forever
queue.process(async (job) => {
await longRunningTask(); // Might hang
});
// GOOD - Set timeout
queue.process(async (job) => {
return Promise.race([
longRunningTask(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), 60000)
)
]);
});
Mistake 3: Not Monitoring Queue Depth
// Monitor queue size
setInterval(async () => {
const counts = await queue.getJobCounts();
if (counts.waiting > 1000) {
console.warn('Queue backing up:', counts);
// Alert, add workers, etc.
}
}, 60000);
Mistake 4: Processing Jobs Twice
// BAD - No idempotency
queue.process(async (job) => {
await db.insert('payments', { amount: 100 }); // Duplicate if retried!
});
// GOOD - Idempotent
queue.process(async (job) => {
const { paymentId } = job.data;
const existing = await db.findOne('payments', { id: paymentId });
if (existing) {
return; // Already processed
}
await db.insert('payments', { id: paymentId, amount: 100 });
});
Mistake 5: Not Scaling Workers
// BAD - Single worker
queue.process(async (job) => { /* ... */ });
// GOOD - Multiple workers
const CONCURRENCY = 10;
queue.process(CONCURRENCY, async (job) => { /* ... */ });
// BETTER - Auto-scale based on queue depth
async function autoScale() {
const counts = await queue.getJobCounts();
const workers = Math.min(Math.ceil(counts.waiting / 100), 50);
queue.process(workers, async (job) => { /* ... */ });
}
The Bottom Line
Message queues solve real problems: slow APIs, lost jobs, tight coupling. Choose based on your needs.
RabbitMQ for complex routing and guaranteed delivery. Best for critical systems.
Redis for speed and simplicity. Best when you already use Redis and need simple queues.
AWS SQS for managed, scalable queues. Best on AWS with zero ops overhead.
We started with Redis for simplicity. Lost jobs during deploys. Switched to RabbitMQ for persistence and guarantees. Now using SQS on AWS for auto-scaling without managing servers.
Pick the right tool for your requirements. Simple queues? Redis. Complex routing? RabbitMQ. On AWS? SQS. All three work - choose based on reliability needs, infrastructure, and operational overhead.