Message Queues: RabbitMQ vs Redis vs SQS

TL;DR

Use RabbitMQ for complex routing and guaranteed delivery. Use Redis for simple, fast queues. Use SQS for managed, scalable queues on AWS. All handle async tasks - choose based on complexity, scale, and infrastructure.

Our API was timing out. Image processing took 30 seconds, but requests timed out at 10. Users uploaded photos and got 504 errors even though processing succeeded.

We added Redis as a message queue. API returns immediately, background worker processes images. Response time: 30 seconds → 200ms. Users happy. Problem solved.

Six months later, we had a different problem. Lost jobs during deploys. Redis queue cleared when restarting. Critical payment webhooks disappeared. We switched to RabbitMQ with persistence and acknowledgments. Zero lost jobs since.

Message queues solve different problems. Here's how to choose between RabbitMQ, Redis, and SQS, with real examples and production lessons.

What Are Message Queues?

Message queues enable asynchronous processing - send work to be done later:

// Without queue (blocking)
app.post('/upload', async (req, res) => {
    const image = req.file;

    // This takes 30 seconds - request times out!
    await processImage(image);
    await generateThumbnails(image);
    await runFaceDetection(image);

    res.json({ success: true });
});

// With queue (non-blocking)
app.post('/upload', async (req, res) => {
    const image = req.file;

    // Add job to queue - instant
    await queue.add('process-image', { imageId: image.id });

    res.json({ success: true }); // Returns in 200ms
});

// Worker processes jobs in background
queue.process('process-image', async (job) => {
    const { imageId } = job.data;
    await processImage(imageId);
    await generateThumbnails(imageId);
    await runFaceDetection(imageId);
});

Benefits:

  • Fast API responses (offload slow work)
  • Reliability (retry failed jobs)
  • Scalability (add more workers)
  • Decoupling (API and workers independent)
  • Load leveling (handle traffic spikes)

When to Use Message Queues

1. Long-Running Tasks

// Jobs that take too long for HTTP requests
queue.add('generate-report', { userId, month });
queue.add('export-data', { format: 'csv', filters });
queue.add('send-email', { to, subject, body });
queue.add('process-video', { videoId });

2. Background Processing

// Tasks that don't need immediate results
queue.add('update-search-index', { documentId });
queue.add('cleanup-old-files', { olderThan: '30d' });
queue.add('calculate-analytics', { date: today });

3. Microservices Communication

// Service A publishes event
await eventBus.publish('user.created', { userId, email });

// Service B subscribes
eventBus.subscribe('user.created', async (data) => {
    await sendWelcomeEmail(data.email);
});

// Service C also subscribes
eventBus.subscribe('user.created', async (data) => {
    await createAnalyticsProfile(data.userId);
});

4. Rate Limiting External APIs

// Prevent hitting rate limits
queue.add('call-external-api', { endpoint, data }, {
    rateLimiter: {
        max: 10,      // 10 jobs
        duration: 1000 // per second
    }
});

5. Scheduled Tasks

// Cron-like jobs
queue.add('send-daily-digest', {}, {
    repeat: { cron: '0 9 * * *' } // Every day at 9am
});

RabbitMQ: The Full-Featured Solution

Best for: Complex routing, guaranteed delivery, enterprise systems

Strengths

  • Reliable: Persistent queues, acknowledgments, guaranteed delivery
  • Flexible routing: Exchanges, bindings, routing keys
  • Multiple protocols: AMQP, MQTT, STOMP
  • Message priority: High-priority jobs first
  • Dead letter queues: Handle failed messages
  • Clustering: High availability

Setup

# Install RabbitMQ
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

Basic Queue (Node.js)

const amqp = require('amqplib');

// Producer
async function sendJob(queueName, data) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    await channel.assertQueue(queueName, { durable: true });

    channel.sendToQueue(
        queueName,
        Buffer.from(JSON.stringify(data)),
        { persistent: true } // Survive RabbitMQ restart
    );

    console.log('Sent:', data);
    await channel.close();
    await connection.close();
}

// Consumer
async function processJobs(queueName) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    await channel.assertQueue(queueName, { durable: true });
    channel.prefetch(1); // Process one at a time

    channel.consume(queueName, async (msg) => {
        const data = JSON.parse(msg.content.toString());
        console.log('Processing:', data);

        try {
            await processImage(data);
            channel.ack(msg); // Acknowledge success
        } catch (err) {
            console.error('Failed:', err);
            channel.nack(msg, false, true); // Requeue on failure
        }
    });
}

// Usage
await sendJob('image-processing', { imageId: 123 });
await processJobs('image-processing');

Pub/Sub Pattern

// Publisher
async function publishEvent(exchange, routingKey, data) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    await channel.assertExchange(exchange, 'topic', { durable: true });

    channel.publish(
        exchange,
        routingKey,
        Buffer.from(JSON.stringify(data))
    );

    console.log('Published:', routingKey, data);
}

// Subscriber
async function subscribe(exchange, pattern, handler) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    await channel.assertExchange(exchange, 'topic', { durable: true });

    const q = await channel.assertQueue('', { exclusive: true });
    channel.bindQueue(q.queue, exchange, pattern);

    channel.consume(q.queue, (msg) => {
        const data = JSON.parse(msg.content.toString());
        handler(msg.fields.routingKey, data);
    }, { noAck: true });
}

// Usage
await publishEvent('events', 'user.created', { userId: 123 });
await publishEvent('events', 'user.deleted', { userId: 456 });

// Subscribe to all user events
await subscribe('events', 'user.*', (key, data) => {
    console.log('Event:', key, data);
});

// Subscribe only to user.created
await subscribe('events', 'user.created', (key, data) => {
    console.log('User created:', data);
});

Priority Queue

// Queue with priorities (0-10)
await channel.assertQueue('tasks', {
    durable: true,
    maxPriority: 10
});

// Send high-priority job
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(data)), {
    persistent: true,
    priority: 9 // High priority
});

// Send low-priority job
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(data)), {
    persistent: true,
    priority: 1 // Low priority
});

// High-priority jobs processed first

Dead Letter Queue

// Main queue with DLQ
await channel.assertQueue('main-queue', {
    durable: true,
    deadLetterExchange: 'dlx',
    deadLetterRoutingKey: 'failed'
});

// Dead letter queue
await channel.assertExchange('dlx', 'direct');
await channel.assertQueue('failed-queue', { durable: true });
await channel.bindQueue('failed-queue', 'dlx', 'failed');

// Failed messages go to failed-queue after 3 retries

Redis: The Fast and Simple Option

Best for: Simple queues, high throughput, already using Redis

Strengths

  • Fast: In-memory, microsecond latency
  • Simple: Easy to set up and use
  • Versatile: Lists, Pub/Sub, Streams
  • Already there: If you use Redis for caching
  • Lightweight: No separate infrastructure

Limitations

  • Not durable: Data in memory (can enable persistence)
  • No guaranteed delivery: Can lose messages on crash
  • Simple routing: No complex routing like RabbitMQ

Redis Lists (Simple Queue)

const Redis = require('ioredis');
const redis = new Redis();

// Producer
async function addJob(queue, data) {
    await redis.lpush(queue, JSON.stringify(data));
    console.log('Added job:', data);
}

// Consumer
async function processJobs(queue) {
    while (true) {
        // BRPOP blocks until job available
        const result = await redis.brpop(queue, 0);
        const data = JSON.parse(result[1]);

        console.log('Processing:', data);
        await processImage(data);
    }
}

// Usage
await addJob('image-queue', { imageId: 123 });
await processJobs('image-queue');

Redis with Bull (Recommended)

const Bull = require('bull');

// Create queue
const imageQueue = new Bull('image-processing', {
    redis: { host: 'localhost', port: 6379 }
});

// Producer
await imageQueue.add({
    imageId: 123,
    userId: 456
}, {
    attempts: 3,      // Retry 3 times
    backoff: {
        type: 'exponential',
        delay: 2000
    }
});

// Consumer
imageQueue.process(async (job) => {
    console.log('Processing:', job.data);
    await processImage(job.data.imageId);
    return { success: true };
});

// Event listeners
imageQueue.on('completed', (job, result) => {
    console.log('Job completed:', job.id, result);
});

imageQueue.on('failed', (job, err) => {
    console.error('Job failed:', job.id, err);
});

// Scheduled jobs
await imageQueue.add({ imageId: 789 }, {
    delay: 60000 // Process in 60 seconds
});

// Recurring jobs
await imageQueue.add({ task: 'cleanup' }, {
    repeat: { cron: '0 * * * *' } // Every hour
});

Redis Pub/Sub

const Redis = require('ioredis');
const pub = new Redis();
const sub = new Redis();

// Subscribe
sub.subscribe('user-events', (err, count) => {
    console.log('Subscribed to', count, 'channels');
});

sub.on('message', (channel, message) => {
    const data = JSON.parse(message);
    console.log('Received:', channel, data);
});

// Publish
await pub.publish('user-events', JSON.stringify({
    type: 'user.created',
    userId: 123
}));

Limitation: Pub/Sub doesn't persist messages - if subscriber is down, messages are lost.

Redis Streams (Better Pub/Sub)

const Redis = require('ioredis');
const redis = new Redis();

// Producer
await redis.xadd(
    'events',
    '*',                    // Auto-generate ID
    'type', 'user.created',
    'userId', '123'
);

// Consumer group (persists position)
await redis.xgroup('CREATE', 'events', 'processors', '0', 'MKSTREAM');

// Consumer
while (true) {
    const results = await redis.xreadgroup(
        'GROUP', 'processors', 'worker-1',
        'COUNT', 10,
        'BLOCK', 5000,
        'STREAMS', 'events', '>'
    );

    if (results) {
        const [stream, messages] = results[0];

        for (const [id, fields] of messages) {
            console.log('Processing:', id, fields);
            await processEvent(fields);

            // Acknowledge
            await redis.xack('events', 'processors', id);
        }
    }
}

Streams are better: Messages persist, multiple consumers, acknowledgments.

AWS SQS: The Managed Solution

Best for: AWS infrastructure, scalability, no ops overhead

Strengths

  • Fully managed: No servers to maintain
  • Scalable: Handles millions of messages
  • Reliable: Distributed, redundant
  • Simple: Easy API
  • Cost-effective: Pay per request
  • Integrated: Works with Lambda, ECS, etc.

Limitations

  • Latency: Higher than Redis/RabbitMQ (network)
  • Vendor lock-in: AWS only
  • Limited routing: Simple queues only (use SNS for pub/sub)
  • Cost: Can be expensive at scale

Standard Queue

const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');

const sqs = new SQSClient({ region: 'us-east-1' });
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/image-queue';

// Send message
async function sendMessage(data) {
    await sqs.send(new SendMessageCommand({
        QueueUrl: queueUrl,
        MessageBody: JSON.stringify(data)
    }));
    console.log('Sent:', data);
}

// Receive and process
async function processMessages() {
    while (true) {
        const response = await sqs.send(new ReceiveMessageCommand({
            QueueUrl: queueUrl,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds: 20 // Long polling
        }));

        if (response.Messages) {
            for (const message of response.Messages) {
                const data = JSON.parse(message.Body);
                console.log('Processing:', data);

                try {
                    await processImage(data);

                    // Delete after successful processing
                    await sqs.send(new DeleteMessageCommand({
                        QueueUrl: queueUrl,
                        ReceiptHandle: message.ReceiptHandle
                    }));
                } catch (err) {
                    console.error('Failed:', err);
                    // Message will return to queue after visibility timeout
                }
            }
        }
    }
}

// Usage
await sendMessage({ imageId: 123 });
await processMessages();

FIFO Queue (Order Guaranteed)

// Create FIFO queue (ends with .fifo)
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/orders.fifo';

// Send with deduplication
await sqs.send(new SendMessageCommand({
    QueueUrl: queueUrl,
    MessageBody: JSON.stringify(data),
    MessageGroupId: 'order-123',        // Messages with same ID are ordered
    MessageDeduplicationId: 'unique-id' // Prevents duplicates within 5 minutes
}));

Dead Letter Queue

// Create main queue with DLQ
// In AWS Console or CloudFormation:
{
    "QueueName": "main-queue",
    "RedrivePolicy": {
        "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789:failed-queue",
        "maxReceiveCount": 3 // After 3 failed attempts, send to DLQ
    }
}

SNS + SQS (Pub/Sub)

const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');

// Publish to SNS topic
const sns = new SNSClient({ region: 'us-east-1' });
await sns.send(new PublishCommand({
    TopicArn: 'arn:aws:sns:us-east-1:123456789:user-events',
    Message: JSON.stringify({ type: 'user.created', userId: 123 })
}));

// Multiple SQS queues subscribe to SNS topic
// Each gets a copy of the message

Comparison Table

Feature RabbitMQ Redis AWS SQS
Speed Fast (10k-50k msg/s) Fastest (100k+ msg/s) Moderate (network latency)
Reliability Excellent (persistent, ack) Good (can lose messages) Excellent (managed, redundant)
Durability Yes (disk persistence) Optional (mostly in-memory) Yes (distributed storage)
Ordering Yes (per queue) Yes (lists/streams) FIFO queues only
Routing Complex (exchanges, bindings) Simple Simple (SNS for pub/sub)
Scaling Clustering required Single instance/cluster Automatic
Operations Self-hosted (complex) Self-hosted (simple) Fully managed (zero ops)
Cost Server costs Server costs Pay per request
Best for Complex routing, guarantees Speed, simplicity AWS infrastructure, scale

When to Use Each

Use RabbitMQ When:

✓ Need guaranteed delivery
✓ Complex routing requirements
✓ Multiple consumers with different patterns
✓ Enterprise/critical systems
✓ Need message priorities
✓ Already have RabbitMQ expertise
✓ On-premise infrastructure

Examples:
- Payment processing
- Order fulfillment
- Critical notifications
- Multi-tenant systems
- Microservices with complex routing

Use Redis When:

✓ Already using Redis for caching
✓ Need very high throughput
✓ Simple queue requirements
✓ Low latency critical
✓ Temporary/non-critical data
✓ Want simple setup

Examples:
- Session storage + job queue
- Real-time analytics
- Rate limiting
- Simple background tasks
- Cache invalidation

Use AWS SQS When:

✓ On AWS infrastructure
✓ Want zero operations overhead
✓ Need auto-scaling
✓ Don't need sub-second latency
✓ Integrating with Lambda/ECS
✓ Want cost-effective at scale

Examples:
- Serverless architectures
- Event-driven systems on AWS
- Image/video processing
- Data pipeline triggers
- Microservices on ECS/EKS

Real-World Example: Image Processing

RabbitMQ Approach

// API: Upload image
app.post('/upload', async (req, res) => {
    const image = await saveImage(req.file);

    // Send to RabbitMQ with priority
    await channel.sendToQueue('images', Buffer.from(JSON.stringify({
        imageId: image.id,
        type: 'thumbnail'
    })), {
        persistent: true,
        priority: 5
    });

    res.json({ imageId: image.id, status: 'processing' });
});

// Worker: Process images
queue.process('images', 10, async (msg) => {
    const { imageId, type } = JSON.parse(msg.content);

    try {
        if (type === 'thumbnail') {
            await generateThumbnail(imageId);
        }

        channel.ack(msg);
    } catch (err) {
        channel.nack(msg, false, false); // Send to DLQ
    }
});

// Scale: Run 10 workers on different servers
// All consume from same queue

Redis/Bull Approach

const imageQueue = new Bull('images');

// API: Upload image
app.post('/upload', async (req, res) => {
    const image = await saveImage(req.file);

    await imageQueue.add({
        imageId: image.id,
        type: 'thumbnail'
    }, {
        attempts: 3,
        backoff: 5000
    });

    res.json({ imageId: image.id, status: 'processing' });
});

// Worker: Process images
imageQueue.process(10, async (job) => {
    const { imageId, type } = job.data;

    if (type === 'thumbnail') {
        await generateThumbnail(imageId);
    }

    return { success: true };
});

// Progress tracking
imageQueue.process(async (job) => {
    await job.progress(0);
    await processStep1(job.data);

    await job.progress(50);
    await processStep2(job.data);

    await job.progress(100);
});

// API: Check status
app.get('/status/:jobId', async (req, res) => {
    const job = await imageQueue.getJob(req.params.jobId);
    res.json({
        status: await job.getState(),
        progress: job.progress()
    });
});

AWS SQS Approach

// API: Upload image
app.post('/upload', async (req, res) => {
    const image = await saveImage(req.file);

    await sqs.send(new SendMessageCommand({
        QueueUrl: process.env.IMAGE_QUEUE_URL,
        MessageBody: JSON.stringify({
            imageId: image.id,
            type: 'thumbnail'
        })
    }));

    res.json({ imageId: image.id, status: 'processing' });
});

// Lambda worker (triggered by SQS)
exports.handler = async (event) => {
    for (const record of event.Records) {
        const { imageId, type } = JSON.parse(record.body);

        if (type === 'thumbnail') {
            await generateThumbnail(imageId);
        }
    }
};

// Scale: Lambda auto-scales with queue depth
// No worker management needed

Migration Path

From Synchronous to Async

// Phase 1: Synchronous (current)
app.post('/process', async (req, res) => {
    const result = await heavyOperation(req.body);
    res.json(result);
});

// Phase 2: Add queue (dual mode)
app.post('/process', async (req, res) => {
    if (req.query.async === 'true') {
        // New: Async mode
        const jobId = await queue.add('process', req.body);
        res.json({ jobId, status: 'processing' });
    } else {
        // Old: Sync mode (keep for backwards compatibility)
        const result = await heavyOperation(req.body);
        res.json(result);
    }
});

// Phase 3: Default to async
app.post('/process', async (req, res) => {
    const jobId = await queue.add('process', req.body);
    res.json({ jobId, status: 'processing' });
});

// Phase 4: Remove sync mode entirely

From Redis to RabbitMQ

// Wrapper to abstract queue implementation
class QueueAdapter {
    async add(queue, data) {
        if (USE_RABBITMQ) {
            await this.rabbitmq.sendToQueue(queue, Buffer.from(JSON.stringify(data)));
        } else {
            await this.redis.lpush(queue, JSON.stringify(data));
        }
    }
}

// Gradually migrate queues
// Week 1: Non-critical queues
// Week 2: Most queues
// Week 3: Critical queues
// Week 4: Remove Redis queue code

Common Mistakes

Mistake 1: Not Handling Failures

// BAD - Job fails silently
queue.process(async (job) => {
    await processImage(job.data.imageId);
    // If this throws, job is lost
});

// GOOD - Retry with backoff
queue.process(async (job) => {
    try {
        await processImage(job.data.imageId);
    } catch (err) {
        if (job.attemptsMade < 3) {
            throw err; // Retry
        } else {
            await logFailure(job, err);
            // Don't throw - job goes to completed/failed
        }
    }
});

Mistake 2: Not Setting Timeouts

// BAD - Job runs forever
queue.process(async (job) => {
    await longRunningTask(); // Might hang
});

// GOOD - Set timeout
queue.process(async (job) => {
    return Promise.race([
        longRunningTask(),
        new Promise((_, reject) =>
            setTimeout(() => reject(new Error('Timeout')), 60000)
        )
    ]);
});

Mistake 3: Not Monitoring Queue Depth

// Monitor queue size
setInterval(async () => {
    const counts = await queue.getJobCounts();

    if (counts.waiting > 1000) {
        console.warn('Queue backing up:', counts);
        // Alert, add workers, etc.
    }
}, 60000);

Mistake 4: Processing Jobs Twice

// BAD - No idempotency
queue.process(async (job) => {
    await db.insert('payments', { amount: 100 }); // Duplicate if retried!
});

// GOOD - Idempotent
queue.process(async (job) => {
    const { paymentId } = job.data;

    const existing = await db.findOne('payments', { id: paymentId });
    if (existing) {
        return; // Already processed
    }

    await db.insert('payments', { id: paymentId, amount: 100 });
});

Mistake 5: Not Scaling Workers

// BAD - Single worker
queue.process(async (job) => { /* ... */ });

// GOOD - Multiple workers
const CONCURRENCY = 10;
queue.process(CONCURRENCY, async (job) => { /* ... */ });

// BETTER - Auto-scale based on queue depth
async function autoScale() {
    const counts = await queue.getJobCounts();
    const workers = Math.min(Math.ceil(counts.waiting / 100), 50);
    queue.process(workers, async (job) => { /* ... */ });
}

The Bottom Line

Message queues solve real problems: slow APIs, lost jobs, tight coupling. Choose based on your needs.

RabbitMQ for complex routing and guaranteed delivery. Best for critical systems.

Redis for speed and simplicity. Best when you already use Redis and need simple queues.

AWS SQS for managed, scalable queues. Best on AWS with zero ops overhead.

We started with Redis for simplicity. Lost jobs during deploys. Switched to RabbitMQ for persistence and guarantees. Now using SQS on AWS for auto-scaling without managing servers.

Pick the right tool for your requirements. Simple queues? Redis. Complex routing? RabbitMQ. On AWS? SQS. All three work - choose based on reliability needs, infrastructure, and operational overhead.