Skip to main content

Queue System with Bull

JifiJs includes a powerful queue system built on Bull, a Redis-based queue for Node.js. This allows you to process tasks asynchronously in the background, improving application performance and user experience.

🎯 Why Use Queues?​

Queues are essential for:

  • Email sending - Don't make users wait for emails to be sent
  • File processing - Handle uploads, conversions, and optimizations asynchronously
  • Data exports - Generate large reports without blocking requests
  • Scheduled tasks - Execute jobs at specific times
  • Rate-limited operations - Control API calls to third-party services
  • Heavy computations - Offload CPU-intensive tasks

πŸ—οΈ Architecture​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Client β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚ HTTP Request
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Controller β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ Add Job
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Bull Queue │◄────►│ Redis β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”‚ Process Job
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Job Handler β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“¦ Queue Configuration​

Environment Setup​

Configure Redis connection in .env:

# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0

# Queue Configuration
QUEUE_REDIS_HOST=localhost
QUEUE_REDIS_PORT=6379
QUEUE_REDIS_DB=1

Queue Instance​

Create a queue instance in queues/email.queue.ts:

import Bull from 'bull';
import config from '../config';

export const emailQueue = new Bull('email-processing', {
redis: {
host: config.queue.redis.host,
port: config.queue.redis.port,
password: config.queue.redis.password,
db: config.queue.redis.db,
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000,
},
removeOnComplete: true,
removeOnFail: false,
},
});

πŸš€ Creating Jobs​

Adding Jobs to the Queue​

import { emailQueue } from '../queues/email.queue';

// In your service or controller
export class UserService {
async registerUser(userData: IUser) {
const user = await User.create(userData);

// Add email job to queue
await emailQueue.add('welcome-email', {
userId: user.id,
email: user.email,
name: user.name,
}, {
delay: 5000, // Send after 5 seconds
priority: 1, // Higher priority = processed first
});

return user;
}
}

Job Priorities​

// High priority (1) - processed first
await emailQueue.add('critical-alert', data, { priority: 1 });

// Normal priority (default)
await emailQueue.add('newsletter', data);

// Low priority (10) - processed last
await emailQueue.add('cleanup', data, { priority: 10 });

βš™οΈ Processing Jobs​

Job Processor​

Create a processor in queues/processors/email.processor.ts:

import { Job } from 'bull';
import { emailQueue } from '../email.queue';
import { EmailService } from '../../services/email.service';

interface WelcomeEmailData {
userId: string;
email: string;
name: string;
}

// Process welcome emails
emailQueue.process('welcome-email', async (job: Job<WelcomeEmailData>) => {
const { email, name } = job.data;

console.log(`Processing welcome email for ${email}`);

try {
await EmailService.sendWelcomeEmail(email, name);

// Update progress
await job.progress(100);

return { success: true, email };
} catch (error) {
console.error('Failed to send welcome email:', error);
throw error; // Will trigger retry
}
});

// Process password reset emails
emailQueue.process('password-reset', async (job: Job) => {
const { email, token } = job.data;

await EmailService.sendPasswordResetEmail(email, token);

return { success: true };
});

Concurrent Processing​

Control how many jobs are processed simultaneously:

// Process 5 jobs concurrently
emailQueue.process('newsletter', 5, async (job: Job) => {
await EmailService.sendNewsletter(job.data);
});

πŸ”„ Job Lifecycle​

Job States​

Jobs go through several states:

  1. waiting - Job added to queue
  2. active - Job being processed
  3. completed - Job finished successfully
  4. failed - Job failed (will retry if configured)
  5. delayed - Job scheduled for future execution

Monitoring Jobs​

import { emailQueue } from './queues/email.queue';

// Get job counts
const counts = await emailQueue.getJobCounts();
console.log(counts);
// {
// waiting: 10,
// active: 2,
// completed: 145,
// failed: 3,
// delayed: 5
// }

// Get specific job
const job = await emailQueue.getJob('job-id');
console.log(job.progress());
console.log(job.getState());

🎯 Advanced Features​

Scheduled Jobs (Cron-like)​

import { emailQueue } from './queues/email.queue';

// Run every day at 9 AM
emailQueue.add('daily-report', {}, {
repeat: {
cron: '0 9 * * *',
tz: 'America/New_York',
},
});

// Run every hour
emailQueue.add('cleanup', {}, {
repeat: {
every: 3600000, // milliseconds
},
});

Job Events​

Listen to queue events:

emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});

emailQueue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
// Send alert to monitoring system
});

emailQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});

emailQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});

Rate Limiting​

Limit job processing rate:

const apiQueue = new Bull('api-calls', {
redis: redisConfig,
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
},
});

Job Progress Tracking​

emailQueue.process('bulk-email', async (job: Job) => {
const emails = job.data.recipients;
const total = emails.length;

for (let i = 0; i < total; i++) {
await sendEmail(emails[i]);

// Update progress
const progress = Math.round((i + 1) / total * 100);
await job.progress(progress);
}

return { sent: total };
});

πŸ›‘οΈ Error Handling​

Retry Strategy​

await emailQueue.add('important-task', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000, // Start with 2s, then 4s, 8s, 16s, 32s
},
});

Custom Retry Logic​

emailQueue.process('api-call', async (job: Job) => {
try {
return await makeApiCall(job.data);
} catch (error) {
if (error.statusCode === 429) {
// Rate limited, wait longer
throw new Error('Rate limited, retry later');
}

if (error.statusCode >= 500) {
// Server error, retry
throw error;
}

// Client error, don't retry
job.moveToFailed({ message: 'Client error' }, true);
}
});

πŸ“Š Queue Management​

Cleaning Old Jobs​

// Remove completed jobs older than 24 hours
await emailQueue.clean(24 * 3600 * 1000, 'completed');

// Remove failed jobs older than 7 days
await emailQueue.clean(7 * 24 * 3600 * 1000, 'failed');

Pausing and Resuming​

// Pause the queue
await emailQueue.pause();

// Resume processing
await emailQueue.resume();

// Check if paused
const isPaused = await emailQueue.isPaused();

Emptying the Queue​

// Remove all waiting jobs
await emailQueue.empty();

// Remove all jobs (waiting, active, delayed, failed)
await emailQueue.obliterate({ force: true });

🎨 Bull Board (UI Dashboard)​

Add a web UI to monitor queues:

import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue } from './queues/email.queue';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});

app.use('/admin/queues', serverAdapter.getRouter());

Access the dashboard at http://localhost:3000/admin/queues

πŸ“ Best Practices​

1. Use Separate Queues​

Create dedicated queues for different job types:

const emailQueue = new Bull('emails');
const fileQueue = new Bull('file-processing');
const notificationQueue = new Bull('notifications');

2. Set Appropriate Timeouts​

await queue.add('long-task', data, {
timeout: 300000, // 5 minutes
});

3. Monitor Queue Health​

// Check queue health periodically
setInterval(async () => {
const counts = await emailQueue.getJobCounts();

if (counts.failed > 100) {
// Alert: Too many failed jobs
console.error('Queue health check failed');
}
}, 60000);

4. Idempotent Jobs​

Design jobs to be safely retried:

emailQueue.process('send-email', async (job: Job) => {
const { userId, emailId } = job.data;

// Check if already sent
const sent = await EmailLog.findOne({ userId, emailId });
if (sent) {
return { status: 'already_sent' };
}

await sendEmail(userId);

// Record that we sent it
await EmailLog.create({ userId, emailId, sentAt: new Date() });

return { status: 'sent' };
});

πŸ” Debugging​

Enable debug logging:

// In development
if (process.env.NODE_ENV === 'development') {
emailQueue.on('global:completed', (jobId) => {
console.log(`Job ${jobId} completed`);
});

emailQueue.on('global:failed', (jobId, err) => {
console.error(`Job ${jobId} failed:`, err);
});
}

πŸ’‘ Pro Tip: Always monitor your queues in production. Failed jobs can accumulate and cause issues. Set up alerts for high failure rates.