Job Queue using BullMQ, Redis, and Node.js - Example with Worker, Queue Events, and Timeout Handling

Using BullMQ, Redis, and Node.js. In modern backend applications, offloading resource-heavy or asynchronous tasks to a background job queue is a must. Whether you’re validating large datasets, sending emails, or processing images, using a job queue ensures your app stays responsive and scalable.

Job Queue using BullMQ, Redis, and Node.js

Understand how to build and monitor a job queue using BullMQ, Redis, and Node.js.

In modern backend applications, offloading resource-heavy or asynchronous tasks to a background job queue is a must. Whether you’re validating large datasets, sending emails, or processing images, using a job queue ensures your app stays responsive and scalable.

In this blog, we’ll walk through setting up a BullMQ-based job queue in Node.js, complete with:

  • Queue creation
  • Worker processing
  • Queue event monitoring
  • Graceful shutdown
  • Timeout and concurrency settings

BullMQ is a Node.js library for handling background jobs using Redis. It’s the next-generation version of bull, built with better TypeScript support and flexibility.

What is BullMQ?

BullMQ is a Node.js library for handling background jobs using Redis. It’s the next-generation version of bull, built with better TypeScript support and flexibility.

We’ll be using:

  • Queue – for adding jobs
  • Worker – for processing jobs
  • QueueEvents – to track job lifecycle (success/failure)

Project Structure

/project-root
  ├── worker.js
  ├── index.js
  └── processor.js

1. Setup the Queue and Worker – index.js

// index.js
const { Queue, Worker, QueueEvents } = require("bullmq");
const { createClient } = require("redis");
const logger = console; // replace with winston/pino in production

const queueName = "validationQueue";
const processorFile = __dirname + "/processor.js";

// Redis connection
const connection = createClient({ url: "redis://localhost:6379" });
connection.connect();

// Job Queue
const validationJobQueue = new Queue(queueName, {
  prefix: "{qctool}",
  connection,
});
validationJobQueue.on("ioredis:close", () => {
  logger.warn("Redis connection closed");
});

// Queue Events
const validationQueueEvents = new QueueEvents(queueName, {
  prefix: "{qctool}",
  connection,
});
validationQueueEvents.on("completed", ({ jobId, returnvalue }) => {
  logger.info(`Job ${jobId} completed with status: ${returnvalue.status}`);
});
validationQueueEvents.on("failed", ({ jobId, failedReason }) => {
  logger.error(`Job ${jobId} failed: ${failedReason}`);
});

// Worker
const validationQueue = {
  concurrency: 5,
  timeout: 10 * 1000, // 10 seconds
};

const worker = new Worker(queueName, processorFile, {
  prefix: "{qctool}",
  connection,
  concurrency: validationQueue.concurrency,
  timeout: validationQueue.timeout,
});

worker.on("completed", ({ id }) => {
  logger.info(`Worker processed job ${id} successfully.`);
});

worker.on("failed", (job = {}, err) => {
  const { id } = job;
  logger.error(`Worker failed job ${id}: ${err.message}`);
});

// Graceful shutdown
process.on("SIGINT", async () => {
  logger.info("Shutting down worker...");
  await worker.close();
  process.exit(0);
});

⚙️ 2. Job Processor – processor.js

// processor.js
module.exports = async function (job) {
  const { data } = job;

  // Simulate processing time
  await new Promise((resolve) => setTimeout(resolve, 2000));

  if (data.shouldFail) {
    throw new Error("Simulated job failure");
  }

  return { status: "success", processedAt: new Date().toISOString() };
};

➕ 3. Add a Job – Test It

You can add a job directly in index.js after the queue is defined, or in a separate test script:

validationJobQueue.add("validate-data", {
  input: "Sample Data",
  shouldFail: false, // toggle for testing
});

Output Preview

Job 26 has completed with status success
Worker processed job 26 successfully.

Or if it fails:

Worker failed job 27: Simulated job failure
Job 27 failed: Simulated job failure

✅ Key Highlights

Feature Code Reference
Queue Definition new Queue(...)
Worker Setup new Worker(...)
Job Timeout timeout: 10000
Concurrency concurrency: 5
Event Handling `QueueEvents.on("completed"
Shutdown Hook process.on("SIGINT", ...)

Conclusion

Using BullMQ with Redis gives you full control over job processing with built-in retries, concurrency, timeouts, and events. This modular approach makes it easy to scale background tasks, monitor job status, and keep your main application responsive.


Additionally Try

  • Adding retry strategies (attempts, backoff)
  • Visualize with bull-board
  • Add Redis clustering and distributed workers for scaling

Happy Coding! 🚀