Wednesday, March 25, 2026

Properly Cancelling Jobs in BullMQ

Cover for Properly Cancelling Jobs in BullMQ
Go back to articles
Manuel Astudillo avatar
Manuel Astudillo
@manast

Properly Cancelling Jobs in BullMQ

Cancelling a running job sounds simple, but getting it right in a distributed system requires careful thought. A naive approach, killing a process or ignoring a running job, could lead to resource leaks, orphaned connections, and/or inconsistent state. BullMQ provides a clean, standards-based cancellation mechanism built on the AbortController Node.js API, giving you fine-grained control over how and when jobs stop.

In this article, we'll cover:

  1. Local cancellation using the built-in AbortSignal API
  2. Remote cancellation across workers using Redis Pub/Sub
  3. Graceful shutdown and clean timeouts using signal composition
  4. Best practices for cleanup and error handling

The Problem with Killing Jobs

When a long-running job needs to stop, whether because a user cancelled an operation, a timeout was exceeded, or the system is shutting down, you can't just abandon it. Consider what happens if you simply ignore a running job:

  • HTTP requests continue consuming bandwidth and remote server resources
  • Database transactions remain open, holding locks
  • File handles and temporary files are never cleaned up
  • External services are never notified of the abort
  • Memory allocated for the operation may never be freed

What you need is cooperative cancellation: the job processor itself decides how to wind down gracefully when asked to stop. This is exactly what BullMQ's AbortSignal integration provides.

Local Cancellation with AbortSignal

BullMQ workers pass an optional AbortSignal as the third parameter to your processor function. When you call worker.cancelJob(jobId), the signal is aborted, and your processor can react immediately.

Important: cancelJob() does not fail the job by itself. It only aborts the signal. Your processor must reject/throw when it detects cancellation; then BullMQ applies normal failure/retry rules based on the error type.

Basic Setup

Cancellation in BullMQ is cooperative: calling cancelJob() only aborts the signal, the processor must listen for that signal and stop on its own. If your processor ignores the signal, the job keeps running.

import { Worker } from "bullmq";

const worker = new Worker(
  "my-queue",
  async (job, token, signal) => {
    // signal is an AbortSignal, your code must react to it
    // for cancellation to actually work. See patterns below.
    return await doWork(job.data, signal);
  },
  { connection: { host: "localhost", port: 6379 } },
);

To request cancellation, call cancelJob on the worker instance. This aborts the signal but does not stop or fail the job by itself, your processor must handle it:

// Cancel a specific job (aborts its signal)
worker.cancelJob("job-123");

// Cancel with a reason (accessible via signal.reason)
worker.cancelJob("job-123", "User requested cancellation");

// Cancel all active jobs (useful during shutdown)
worker.cancelAllJobs("System shutting down");

The sections below show how to write processors that actually respond to cancellation.

The most responsive approach is to listen for the abort event on the signal. When the event fires, reject the promise directly, no flags, no polling loops:

const worker = new Worker(
  "my-queue",
  async (job, token, signal) => {
    return new Promise((resolve, reject) => {
      // Listen for abort event, reject immediately
      signal?.addEventListener("abort", () => {
        console.log(`Job ${job.id} cancellation requested`);

        // Clean up resources
        clearInterval(interval);

        // Reject with error
        reject(new Error("Job was cancelled"));
      });

      // Your processing logic
      const interval = setInterval(() => {
        processNextItem();
      }, 100);
    });
  },
  { connection },
);

Why this works:

  • Immediate response: No polling delay; the abort listener fires the instant cancellation is requested
  • More efficient: No CPU wasted checking flags in loops
  • Cleaner code: Separation of concerns between work and cancellation
  • Standard pattern: Matches how Web APIs like fetch() handle AbortSignal

If cancellation is user-initiated and should not retry, throw UnrecoverableError instead of Error.

Using with fetch and Other Native APIs

One of the most useful aspects of the AbortSignal standard is that many Web APIs support it natively. You can pass the signal directly to fetch, and the HTTP request will be truly cancelled at the network level:

const worker = new Worker(
  "scraper-queue",
  async (job, token, signal) => {
    // The signal cancels the actual HTTP request, not just the job
    const response = await fetch(job.data.url, { signal });
    const html = await response.text();
    return { length: html.length, status: response.status };
  },
  { connection },
);

When the signal fires, fetch throws an AbortError which propagates to the worker automatically.

This pattern works with any API that accepts an AbortSignal:

  • fetch(url, { signal }), HTTP requests
  • addEventListener(event, handler, { signal }), auto-removes listener on abort
  • Some database and HTTP clients
  • Node.js APIs that support AbortSignal

Controlling Retry Behavior

How you throw determines what happens to the job after cancellation:

Error TypeRetries?Use Case
new Error(...)Yes (if attempts remain)Transient cancellation, retry later
new UnrecoverableError(...)NoPermanent cancellation, user cancelled
import { Worker, UnrecoverableError } from "bullmq";

// With regular Error, job will retry if attempts remain
const worker = new Worker(
  "retryQueue",
  async (job, token, signal) => {
    return new Promise((resolve, reject) => {
      signal?.addEventListener("abort", () => {
        reject(new Error("Cancelled, will retry"));
      });

      // Your work...
    });
  },
  { connection },
);

// Set attempts when adding jobs
await queue.add("task", data, { attempts: 3 });
import { Worker, UnrecoverableError } from "bullmq";

// With UnrecoverableError, no retries, cancellation is permanent
const worker = new Worker(
  "noRetryQueue",
  async (job, token, signal) => {
    return new Promise((resolve, reject) => {
      signal?.addEventListener("abort", () => {
        reject(new UnrecoverableError("Cancelled permanently"));
      });

      // Your work...
    });
  },
  { connection },
);

You can also inspect signal.reason to decide at runtime:

const worker = new Worker(
  "my-queue",
  async (job, token, signal) => {
    return new Promise((resolve, reject) => {
      signal?.addEventListener("abort", () => {
        const reason = signal.reason ?? "cancelled";
        if (reason === "user-cancelled") {
          reject(new UnrecoverableError("Cancelled by user"));
        } else {
          reject(new Error(`Cancelled: ${reason}`));
        }
      });

      // Your work...
    });
  },
  { connection },
);

Remote Cancellation via Redis Pub/Sub

The built-in worker.cancelJob() method works on the local worker instance, so you need a reference to the worker object that is processing the job. But what if the cancellation request comes from a different process? For example:

  • A web server receives a user's cancel request, but the job is running on a separate worker process
  • You have multiple worker servers and don't know which one is processing a specific job
  • A monitoring dashboard needs to cancel jobs across the cluster

The solution is to use Redis Pub/Sub as a signaling channel between the process that wants to cancel the job and the worker that is running it.

Architecture

┌─────────────┐     Redis Pub/Sub      ┌──────────────┐
│  API Server │ ──── PUBLISH ────────► │   Worker 1   │
│ (cancel req)│   {action, jobId,      │  (processing │
│             │    reason, ...}        │   jobs)      │
└─────────────┘                        └──────────────┘
                                       ┌──────────────┐
                ──── PUBLISH ────────► │   Worker 2   │
                  same payload         │  (processing │
                                       │   jobs)      │
                                       └──────────────┘

Implementation

We recommend using a single control channel per queue, following the same prefix and queue name that BullMQ already uses for its Redis keys: {prefix}:{queueName}:control. An action field in each message distinguishes the operation type, here we use "cancel", but the same channel could carry other control actions in the future (e.g. worker concurrency changes).

By default the prefix is bull, so for a queue named my-queue the channel would be bull:my-queue:control.

If you only have a handful of queues, you can simplify further by using a single shared channel for all of them, for example {prefix}:control, and including a queueName field in each message so workers can filter. This avoids one subscription per queue and keeps the setup minimal.

The per-queue channel only starts to matter when you have many queues and want to avoid dispatching irrelevant messages, but since control messages are infrequent in practice, either approach works well.

Important: Pub/Sub is fire-and-forget. If no subscriber is listening when a message is published, it is lost, Redis does not buffer Pub/Sub messages. This means:

  • If all workers for a queue are down when you publish a cancel request, nobody will receive it.
  • The publisher gets no confirmation that the message was actually processed.

For most cancellation use cases this is fine, and is always possible to send this message several times to increase the chances the actual worker processing the job receives it.

First, set up the worker to subscribe to the control channel:

import { Worker } from "bullmq";
import Redis from "ioredis";

const connection = { host: "localhost", port: 6379 };
const queueName = "my-queue";
const prefix = "bull"; // default BullMQ prefix

// Create a dedicated Redis subscriber for control signals.
// Important: A Redis connection in subscribe mode cannot be used
// for other commands, so we need a separate connection.s
const subscriber = new Redis(connection);
const CONTROL_CHANNEL = `${prefix}:${queueName}:control`;

const worker = new Worker(
  queueName,
  async (job, token, signal) => {
    // Pass the signal to operations that support it.
    // If cancelled, the error propagates and the worker fails the job automatically.
    return await doExpensiveWork(job.data, signal);
  },
  { connection },
);

// Listen for remote control messages
subscriber.subscribe(CONTROL_CHANNEL);
subscriber.on("message", (channel, message) => {
  if (channel === CONTROL_CHANNEL) {
    try {
      const payload = JSON.parse(message);
      if (payload.action !== "cancel") {
        return;
      }

      const { jobId, reason } = payload;
      // cancelJob returns true if this worker was processing that job
      const cancelled = worker.cancelJob(jobId, reason);
      if (cancelled) {
        console.log(`Cancelled job ${jobId} from remote request`);
      }
    } catch (err) {
      console.error("Invalid cancel message:", err);
    }
  }
});

Then, from any other process (API server, admin dashboard, CLI tool, etc.), publish a cancel message:

import Redis from "ioredis";

const redis = new Redis({ host: "localhost", port: 6379 });
const queueName = "my-queue";
const prefix = "bull"; // must match the worker's prefix
const CONTROL_CHANNEL = `${prefix}:${queueName}:control`;

async function cancelJobRemotely(jobId: string, reason?: string) {
  await redis.publish(
    CONTROL_CHANNEL,
    JSON.stringify({
      action: "cancel",
      jobId,
      reason: reason ?? "Remote cancellation",
    }),
  );
  console.log(`Cancel request published for job ${jobId}`);
}

// Cancel from an API endpoint
app.post("/api/jobs/:id/cancel", async (req, res) => {
  await cancelJobRemotely(req.params.id, "Cancelled by user");
  res.json({ status: "cancel-requested" });
});

How It Works

  1. Every worker process subscribes to its queue-scoped control channel (for example: bull:my-queue:control)
  2. When a cancellation request arrives from any source, it publishes a message to that channel
  3. All worker processes receive the message and call worker.cancelJob(jobId)
  4. Only the worker actually processing that job will find a match, and cancelJob returns true
  5. The AbortSignal fires in the processor, and the job cleans up gracefully

Since Redis Pub/Sub delivers messages to all subscribers, this works regardless of how many worker processes you have or which one is handling the job. The overhead is usually negligible: the message is a small JSON payload, and cancelJob on a non-matching worker is an in-memory no-op.

Production-Ready Version

For a production deployment, you'll could encapsulate this pattern into a reusable helper:

import { Job, Worker, WorkerOptions } from "bullmq";
import Redis from "ioredis";

interface CancellableWorkerOptions extends WorkerOptions {
  controlChannel?: string;
}

function createCancellableWorker<T, R>(
  queueName: string,
  processor: (
    job: Job<T, R>,
    token?: string,
    signal?: AbortSignal,
  ) => Promise<R>,
  opts: CancellableWorkerOptions,
) {
  const prefix = String(opts.prefix ?? "bull");
  const controlChannel =
    opts.controlChannel ?? `${prefix}:${queueName}:control`;
  const worker = new Worker(queueName, processor, opts);

  // Dedicated subscriber connection
  const subscriber = new Redis(opts.connection as Redis.RedisOptions);
  subscriber.subscribe(controlChannel);

  subscriber.on("message", (_channel, message) => {
    try {
      const payload = JSON.parse(message);
      if (payload.action !== "cancel") {
        return;
      }

      const { jobId, reason } = payload;
      worker.cancelJob(jobId, reason);
    } catch (err) {
      console.error("Invalid cancel message:", err);
    }
  });

  // Clean up subscriber when worker closes
  const originalClose = worker.close.bind(worker);
  worker.close = async (force?: boolean) => {
    await subscriber.unsubscribe(controlChannel);
    await subscriber.quit();
    return originalClose(force);
  };

  return { worker, controlChannel };
}

Usage:

const { worker, controlChannel } = createCancellableWorker(
  "video-processing",
  async (job, token, signal) => {
    return await transcodeVideo(job.data.videoUrl, signal);
  },
  { connection },
);

And to cancel from anywhere:

const redis = new Redis(connection);
await redis.publish(
  controlChannel,
  JSON.stringify({
    action: "cancel",
    jobId: "job-123",
    reason: "User cancelled upload",
  }),
);

Combining Cancellation with Graceful Shutdown

BullMQ's worker.close() is already graceful: it stops fetching new jobs and waits for all currently active processors to finish before closing Redis connections and cleaning up. In many cases that's all you need, the in-flight jobs complete normally, and the worker shuts down cleanly.

Cancellation becomes useful when you want to speed up that graceful shutdown. If your processors run long tasks (video transcoding, large data imports, etc.) and you can't afford to wait minutes for them to finish, cancelling active jobs lets the processors abort early so worker.close() returns faster. But this is a trade-off: cancelled jobs will be failed (and possibly retried), whereas letting them finish means no work is wasted.

Choose the approach that fits your use case:

const worker = new Worker(
  "my-queue",
  async (job, token, signal) => {
    return await doWork(job.data, signal);
  },
  { connection },
);

process.on("SIGTERM", async () => {
  // Option 1: Just close gracefully — wait for active jobs to finish
  await worker.close();

  // Option 2: Cancel active jobs to speed up shutdown.
  // Pause first so the worker doesn't pick up the cancelled jobs again
  // before close() finishes.
  // await worker.pause();
  // worker.cancelAllJobs('Process shutting down');
  // await worker.close();

  process.exit(0);
});

Implementing Clean Job Timeouts

A job that runs forever is just as problematic as one that crashes. Timeouts and cancellation are two sides of the same coin: the processor still needs to stop cooperatively and clean up. The AbortSignal API makes this straightforward with AbortSignal.any().

Composing Signals with AbortSignal.timeout()

Modern Node.js (v17.3+) provides AbortSignal.timeout(), which creates a signal that auto-aborts after a specified duration. Combined with AbortSignal.any(), you can merge the worker's cancellation signal with a timeout signal:

const worker = new Worker(
  "my-queue",
  async (job, token, signal) => {
    // Create a timeout signal (30 seconds)
    const timeoutSignal = AbortSignal.timeout(30_000);

    // Combine timeout + external cancellation (when provided)
    const signals: AbortSignal[] = [timeoutSignal];
    if (signal) {
      signals.push(signal);
    }
    const combinedSignal = AbortSignal.any(signals);

    // Pass the combined signal. If it fires, the operation throws and
    // the worker catches it automatically to fail the job.
    const response = await fetch(job.data.url, { signal: combinedSignal });
    return await response.json();
  },
  { connection },
);

Per-Job Configurable Timeouts

In practice, different jobs need different timeouts. A video transcode shouldn't have the same limit as a thumbnail resize. Store the timeout in job data and create the signal dynamically:

// When adding jobs, specify the timeout
await queue.add("process-video", {
  videoUrl: "https://...",
  timeoutMs: 300_000, // 5 minutes for large videos
});

await queue.add("process-thumbnail", {
  imageUrl: "https://...",
  timeoutMs: 10_000, // 10 seconds for thumbnails
});

// In the worker
const worker = new Worker(
  "media-queue",
  async (job, token, signal) => {
    const timeoutMs = job.data.timeoutMs ?? 60_000; // Default: 1 minute
    const timeoutSignal = AbortSignal.timeout(timeoutMs);
    const signals: AbortSignal[] = [timeoutSignal];
    if (signal) {
      signals.push(signal);
    }
    const combinedSignal = AbortSignal.any(signals);

    return await processMedia(job.data, combinedSignal);
  },
  { connection },
);

Reusable Timeout Helper

Create a reusable utility that composes the worker signal with a timeout and passes the resulting signal to the underlying operation:

async function withTimeout<T>(
  operation: (signal: AbortSignal) => Promise<T>,
  timeoutMs: number,
  parentSignal?: AbortSignal,
): Promise<T> {
  const timeoutSignal = AbortSignal.timeout(timeoutMs);
  const combinedSignal = parentSignal
    ? AbortSignal.any([parentSignal, timeoutSignal])
    : timeoutSignal;

  return await operation(combinedSignal);
}

// Usage in a worker
const worker = new Worker(
  "my-queue",
  async (job, token, signal) => {
    // Wrap any async operation with a timeout
    const result = await withTimeout(
      (combinedSignal) => fetchAndProcess(job.data.url, combinedSignal),
      30_000, // 30 second timeout
      signal,
    );
    return result;
  },
  { connection },
);

The helper is composable, so you can apply different timeouts to each phase of a job:

const worker = new Worker(
  "pipeline-queue",
  async (job, token, signal) => {
    // Different timeouts for different phases
    const downloaded = await withTimeout(
      (combinedSignal) => download(job.data.url, combinedSignal),
      10_000,
      signal,
    );
    const processed = await withTimeout(
      (combinedSignal) => process(downloaded, combinedSignal),
      60_000,
      signal,
    );
    const uploaded = await withTimeout(
      (combinedSignal) => upload(processed, combinedSignal),
      30_000,
      signal,
    );

    return { downloadSize: downloaded.length, outputUrl: uploaded.url };
  },
  { connection },
);

Best Practices

  1. Always use the event-based pattern for immediate responsiveness. Polling introduces latency between the cancel request and the processor reacting.

  2. Clean up resources in the abort handler. Close database connections, cancel HTTP requests, delete temporary files, and release anything else the job was using.

  3. Use UnrecoverableError for user-initiated cancellations. If a user explicitly cancelled a job, retrying it automatically would be unexpected behavior.

  4. Use regular Error for system-initiated cancellations. Shutdowns and transient issues should allow the job to be retried by another worker.

  5. Use a single control channel per queue. The convention {prefix}:{queueName}:control with an action field in each message keeps things simple now and extensible later. Any service that knows the prefix and queue name can publish control messages without custom wiring.

  6. Handle cleanup errors gracefully. If cleanup fails, log the error but don't let it mask the original cancellation.

  7. Combine with the lockRenewalFailed event. If a worker loses its Redis lock (due to network issues), cancel the affected jobs so they can be picked up by another worker:

worker.on("lockRenewalFailed", (jobIds) => {
  jobIds.forEach((id) => worker.cancelJob(id, "Lock renewal failed"));
});

What's Next

This article focused on Node.js, but job cancellation is equally important in Python and Elixir workers. In upcoming articles, we'll cover:

  • Cancelling jobs in BullMQ Python, using asyncio.CancelledError and task cancellation
  • Cancelling jobs in BullMQ Elixir, using OTP process signals and GenServer patterns

The core pattern remains the same across all languages: cooperative cancellation with proper resource cleanup, and Redis Pub/Sub for cross-process signaling.


Ready to try BullMQ? Check out the documentation to get started, or explore the cancellation guide for the full API reference.