Background Jobs and Queues: Stop Blocking Your API
TL;DR
Never block API responses with slow operations. Use job queues for emails, image processing, reports. Bull with Redis is simple and reliable. Process jobs in workers, not request handlers.
My API endpoint was timing out. Users uploaded images, and the endpoint resized them, generated thumbnails, updated the database, sent webhooks, and returned a response. The whole process took 8 seconds. Users saw spinning loaders. Requests timed out. Heroku killed long-running processes.
I moved everything to background jobs. API response dropped to 40ms. Images processed asynchronously. Users got instant feedback. No more timeouts.
Background jobs are essential for any non-trivial API. Here's how to implement them and when to use them.
The Problem: Blocking Operations
Without background jobs, slow operations block API responses:
// BAD: Everything in the request handler
app.post('/api/posts', upload.single('image'), async (req, res) => {
try {
// 1. Resize image (3 seconds)
const resized = await sharp(req.file.buffer)
.resize(1200, 1200)
.jpeg({ quality: 90 })
.toBuffer();
// 2. Upload to S3 (2 seconds)
await s3.upload({
Key: `posts/${filename}`,
Body: resized
}).promise();
// 3. Generate thumbnails (2 seconds)
const thumb = await sharp(req.file.buffer)
.resize(300, 300)
.jpeg({ quality: 80 })
.toBuffer();
await s3.upload({
Key: `thumbs/${filename}`,
Body: thumb
}).promise();
// 4. Save to database (200ms)
const post = await db.query('INSERT INTO posts SET ?', {
user_id: req.user.id,
image_url: imageUrl,
thumb_url: thumbUrl
});
// 5. Send notifications (500ms)
await sendNotifications(post.id);
// 6. Update search index (1 second)
await searchIndex.add(post);
// Total time: ~8 seconds
res.json(post);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
Problems:
- User waits 8 seconds for response
- Request can timeout (30s limit on most platforms)
- Can't retry failures easily
- Single point of failure
- Wastes API server resources
The Solution: Background Jobs
Move slow operations to background workers:
const Queue = require('bull');
const imageQueue = new Queue('image-processing', {
redis: { host: 'localhost', port: 6379 }
});
// GOOD: Fast API response, process in background
app.post('/api/posts', upload.single('image'), async (req, res) => {
// 1. Save to database immediately (200ms)
const post = await db.query('INSERT INTO posts SET ?', {
user_id: req.user.id,
status: 'processing'
});
// 2. Queue background job
await imageQueue.add('process-image', {
postId: post.id,
imageBuffer: req.file.buffer.toString('base64'),
userId: req.user.id
});
// 3. Return immediately
res.json({
id: post.id,
status: 'processing',
message: 'Image is being processed'
});
});
// Worker processes jobs in background
imageQueue.process('process-image', 5, async (job) => {
const { postId, imageBuffer, userId } = job.data;
const buffer = Buffer.from(imageBuffer, 'base64');
// All the slow stuff happens here
const resized = await sharp(buffer).resize(1200, 1200).jpeg().toBuffer();
const imageUrl = await uploadToS3(resized, 'posts');
const thumb = await sharp(buffer).resize(300, 300).jpeg().toBuffer();
const thumbUrl = await uploadToS3(thumb, 'thumbs');
await db.query('UPDATE posts SET ? WHERE id = ?', [{
image_url: imageUrl,
thumb_url: thumbUrl,
status: 'published'
}, postId]);
await sendNotifications(postId);
await searchIndex.add({ id: postId, userId });
return { success: true, postId };
});
Benefits:
- API responds in 200ms instead of 8s
- Jobs processed asynchronously
- Automatic retries on failure
- Can scale workers independently
- Better resource utilization
Bull: Redis-Backed Job Queue
Bull is the most popular Node.js queue library:
npm install bull ioredis
Basic Setup
const Queue = require('bull');
// Create queue
const emailQueue = new Queue('email', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD
}
});
// Add job to queue
await emailQueue.add('welcome-email', {
userId: 123,
email: 'user@example.com',
name: 'John'
});
// Process jobs
emailQueue.process('welcome-email', async (job) => {
const { userId, email, name } = job.data;
await sendEmail({
to: email,
subject: 'Welcome!',
template: 'welcome',
data: { name }
});
// Update user record
await db.query('UPDATE users SET welcome_sent = true WHERE id = ?', [userId]);
return { success: true, userId };
});
Job Options
// Delayed job
await emailQueue.add('reminder', { userId: 123 }, {
delay: 3600000 // 1 hour from now
});
// Priority (lower number = higher priority)
await emailQueue.add('urgent', { data }, { priority: 1 });
await emailQueue.add('normal', { data }, { priority: 5 });
// Retry configuration
await emailQueue.add('api-call', { url }, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000
}
});
// Remove job after completion
await emailQueue.add('temp-job', { data }, {
removeOnComplete: true,
removeOnFail: false
});
// Job timeout
await emailQueue.add('long-task', { data }, {
timeout: 30000 // Fail if takes > 30 seconds
});
Worker Configuration
// Process multiple jobs concurrently
emailQueue.process('send-email', 10, async (job) => {
// Processes up to 10 jobs simultaneously
await sendEmail(job.data);
});
// Multiple job types in one queue
emailQueue.process('welcome-email', async (job) => {
await sendWelcomeEmail(job.data);
});
emailQueue.process('reset-password', async (job) => {
await sendPasswordResetEmail(job.data);
});
emailQueue.process('notification', async (job) => {
await sendNotificationEmail(job.data);
});
// Separate worker file
// worker.js
const emailQueue = new Queue('email', { redis: redisConfig });
emailQueue.process(async (job) => {
console.log(`Processing job ${job.id}`);
switch (job.name) {
case 'welcome-email':
return await sendWelcomeEmail(job.data);
case 'notification':
return await sendNotification(job.data);
default:
throw new Error(`Unknown job type: ${job.name}`);
}
});
console.log('Worker started');
Run workers separately:
# API server
node app.js
# Workers (separate processes)
node worker.js
node worker.js
node worker.js
Job Lifecycle and Events
const queue = new Queue('tasks');
// Job events
queue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
queue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
// Alert, log, or take action
});
queue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
queue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});
// Queue events
queue.on('error', (error) => {
console.error('Queue error:', error);
});
queue.on('waiting', (jobId) => {
console.log(`Job ${jobId} is waiting`);
});
queue.on('active', (job) => {
console.log(`Job ${job.id} started`);
});
// Job progress reporting
queue.process(async (job) => {
const items = job.data.items;
for (let i = 0; i < items.length; i++) {
await processItem(items[i]);
// Report progress
await job.progress((i + 1) / items.length * 100);
}
return { processed: items.length };
});
Common Job Patterns
1. Email Jobs
const emailQueue = new Queue('email');
// Queue email
async function queueEmail(to, template, data) {
await emailQueue.add('send', {
to,
template,
data,
queuedAt: Date.now()
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000
}
});
}
// Worker
emailQueue.process('send', async (job) => {
const { to, template, data } = job.data;
const html = await renderTemplate(template, data);
await sendEmail({
to,
subject: getSubject(template),
html
});
return { sent: true, to };
});
// Usage
await queueEmail('user@example.com', 'welcome', { name: 'John' });
2. Image Processing
const imageQueue = new Queue('images');
imageQueue.process('resize', 5, async (job) => {
const { imageUrl, sizes } = job.data;
const image = await downloadImage(imageUrl);
const results = {};
for (const [name, { width, height }] of Object.entries(sizes)) {
const resized = await sharp(image)
.resize(width, height, { fit: 'cover' })
.jpeg({ quality: 90 })
.toBuffer();
const url = await uploadToS3(resized, `${name}/${filename}`);
results[name] = url;
await job.progress(Object.keys(results).length / Object.keys(sizes).length * 100);
}
return results;
});
// Usage
const job = await imageQueue.add('resize', {
imageUrl: 'https://example.com/image.jpg',
sizes: {
thumbnail: { width: 150, height: 150 },
medium: { width: 600, height: 600 },
large: { width: 1200, height: 1200 }
}
});
3. Report Generation
const reportQueue = new Queue('reports');
reportQueue.process('generate', async (job) => {
const { userId, reportType, params } = job.data;
await job.progress(10);
// Fetch data (slow)
const data = await fetchReportData(reportType, params);
await job.progress(40);
// Generate report (slow)
const pdf = await generatePDF(data);
await job.progress(70);
// Upload to S3
const url = await uploadToS3(pdf, `reports/${userId}/${Date.now()}.pdf`);
await job.progress(90);
// Notify user
await emailQueue.add('send', {
to: params.email,
template: 'report-ready',
data: { url }
});
await job.progress(100);
return { url, size: pdf.length };
});
// API endpoint
app.post('/api/reports', async (req, res) => {
const job = await reportQueue.add('generate', {
userId: req.user.id,
reportType: req.body.type,
params: req.body.params
});
res.json({
jobId: job.id,
status: 'queued',
message: 'Report is being generated. You will receive an email when ready.'
});
});
// Check job status
app.get('/api/reports/:jobId', async (req, res) => {
const job = await reportQueue.getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
const state = await job.getState();
const progress = job.progress();
res.json({
id: job.id,
state,
progress,
result: state === 'completed' ? job.returnvalue : null
});
});
4. Webhook Delivery
const webhookQueue = new Queue('webhooks');
webhookQueue.process('deliver', async (job) => {
const { url, payload, headers } = job.data;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...headers
},
body: JSON.stringify(payload),
timeout: 10000
});
if (!response.ok) {
throw new Error(`Webhook failed: ${response.status}`);
}
return {
status: response.status,
deliveredAt: Date.now()
};
});
// Queue webhook with retries
async function sendWebhook(url, payload) {
await webhookQueue.add('deliver', {
url,
payload,
headers: {
'X-Webhook-Signature': signPayload(payload)
}
}, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000
}
});
}
5. Data Sync Jobs
const syncQueue = new Queue('sync');
// Recurring sync job
syncQueue.add('sync-users', {}, {
repeat: {
cron: '0 * * * *' // Every hour
}
});
syncQueue.process('sync-users', async (job) => {
const users = await fetchUsersFromExternalAPI();
for (const user of users) {
await db.query(
'INSERT INTO users SET ? ON DUPLICATE KEY UPDATE ?',
[user, user]
);
}
return { synced: users.length };
});
Bull Dashboard
Monitor jobs with Bull Board:
npm install @bull-board/express
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullAdapter(emailQueue),
new BullAdapter(imageQueue),
new BullAdapter(reportQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queues
Dashboard shows:
- Active, waiting, completed, failed jobs
- Job details and logs
- Retry failed jobs
- Clean old jobs
- Pause/resume queues
Scheduled Jobs (Cron)
// Run every day at 2 AM
await cleanupQueue.add('cleanup-old-data', {}, {
repeat: {
cron: '0 2 * * *'
}
});
// Run every 15 minutes
await backupQueue.add('backup', {}, {
repeat: {
every: 15 * 60 * 1000
}
});
// Run on specific days
await reportQueue.add('weekly-report', {}, {
repeat: {
cron: '0 9 * * 1' // Monday at 9 AM
}
});
Job Dependencies
// Process jobs in sequence
const job1 = await queue.add('step-1', { data: 'foo' });
const job2 = await queue.add('step-2', { data: 'bar' }, {
parent: job1.id
});
const job3 = await queue.add('step-3', { data: 'baz' }, {
parent: job2.id
});
// job2 waits for job1, job3 waits for job2
Error Handling
queue.process(async (job) => {
try {
const result = await doWork(job.data);
return result;
} catch (error) {
// Log error
console.error(`Job ${job.id} failed:`, error);
// Retry specific errors
if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') {
throw error; // Bull will retry
}
// Don't retry other errors
await job.moveToFailed({
message: error.message
}, true);
// Notify on permanent failure
if (job.attemptsMade >= job.opts.attempts) {
await notifyError(job, error);
}
}
});
// Global error handler
queue.on('failed', async (job, error) => {
if (job.attemptsMade >= job.opts.attempts) {
// All retries exhausted
await logPermanentFailure(job, error);
await alertTeam(job, error);
}
});
Graceful Shutdown
async function gracefulShutdown() {
console.log('Shutting down gracefully...');
// Stop accepting new jobs
await queue.pause();
// Wait for active jobs to finish (with timeout)
await queue.close(30000);
console.log('All jobs completed. Exiting.');
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
Serverless Alternative: AWS Lambda
For simple jobs without needing Bull:
// API queues Lambda invocation
const AWS = require('aws-sdk');
const lambda = new AWS.Lambda();
app.post('/api/process', async (req, res) => {
// Invoke Lambda asynchronously
await lambda.invoke({
FunctionName: 'image-processor',
InvocationType: 'Event', // Async
Payload: JSON.stringify({
imageUrl: req.body.imageUrl,
userId: req.user.id
})
}).promise();
res.json({
status: 'processing',
message: 'Image processing started'
});
});
// Lambda function (image-processor)
exports.handler = async (event) => {
const { imageUrl, userId } = event;
const image = await downloadImage(imageUrl);
const resized = await sharp(image).resize(1200, 1200).jpeg().toBuffer();
const url = await uploadToS3(resized);
await updateDatabase(userId, url);
await sendNotification(userId, url);
return { success: true };
};
When to Use What
Use Bull + Redis when:
- Need reliable job processing
- Want retry logic and error handling
- Need job prioritization
- Running on persistent servers
- Want dashboard and monitoring
Use Lambda/serverless when:
- Simple one-off tasks
- Don't want to manage workers
- Irregular workload (sporadic jobs)
- Want auto-scaling
- Already on AWS/serverless architecture
Use cron/scheduled tasks when:
- Regular scheduled jobs only
- Don't need queueing
- Low volume
- Simple scripts
Production Setup
// config/queue.js
const Queue = require('bull');
const Redis = require('ioredis');
const redisConfig = {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // Bull requirement
enableReadyCheck: false
};
// Create queue with proper config
function createQueue(name, options = {}) {
return new Queue(name, {
redis: redisConfig,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 500, // Keep last 500 failed jobs
...options
}
});
}
module.exports = {
emailQueue: createQueue('email'),
imageQueue: createQueue('images', { timeout: 60000 }),
reportQueue: createQueue('reports', { timeout: 300000 })
};
// worker.js
const { emailQueue, imageQueue, reportQueue } = require('./config/queue');
// Process jobs
emailQueue.process(5, require('./workers/email'));
imageQueue.process(3, require('./workers/image'));
reportQueue.process(1, require('./workers/report'));
// Monitor
emailQueue.on('completed', (job) => {
console.log(`Email job ${job.id} completed`);
});
emailQueue.on('failed', (job, error) => {
console.error(`Email job ${job.id} failed:`, error.message);
});
console.log('Worker started');
// Deployment with PM2
// ecosystem.config.js
module.exports = {
apps: [
{
name: 'api',
script: 'app.js',
instances: 2,
exec_mode: 'cluster'
},
{
name: 'worker',
script: 'worker.js',
instances: 4
}
]
};
The Bottom Line
Background jobs are essential for production APIs:
Never block API responses with slow operations. Queue them.
Use Bull + Redis for reliable job processing with retries and monitoring.
Process jobs in separate workers not in your API servers.
Monitor job queues - failed jobs mean something is broken.
Handle failures gracefully with retries and alerts.
Moving slow operations to background jobs dropped my API response time from 8 seconds to 40ms. Users got instant feedback. No more timeouts. No more angry users.
Implement background jobs before you need them. Your users will thank you.