Monday, December 4, 2023

Better queue markers in BullMQ v5

Reliable Queues in Redis™

When Bull was first created, it was implemented as reliable and polling-free queue. 'Reliable' in the sense that a job will never get lost or stuck and 'poll-free' because workers would be notified by Redis™ when a new job was added to the queue. These features were achieved by using the BRPOPLPUSH command, a blocking command that waits for a new item to be added to a list, then pops it from the list and pushes it to another list atomically. In the case of BullMQ, this would be moving a job id from the "wait" list to the "active" list.

stateDiagram-v2 direction LR [*] --> Wait: Add Job Wait --> Active: BRPOPLPUSH Active --> Completed Active --> Failed

As the command is blocking, no polling is necessary, reducing the overhead of having to poll Redis™ for each worker waiting for a new job. This made Bull very fast and robust, contributing significantly to its success compared to other queueing libraries for NodeJS.

There is a problem with this mechanism though, which is that it fits well for the simplest use case, lika straightforward LIFO or FIFO queue. However, as we started adding more features to the queue, like delayed jobs, repeatable jobs, priority jobs, rate limiting, etc., the mechanism began to become complex and inefficient. For instance, inserting priority jobs in order into the wait list requires O(n) complexity and maintaining a separate set with all the job priorities (to compute the insertion point of the next added priority job).

stateDiagram-v2 direction LR [*] --> Priorities: Add Prioritized Job Priorities --> Wait: Insert job Wait --> Active: BRPOPLPUSH Active --> Completed Active --> Failed

Introducing Queue Markers

One of the latest improvements in BullMQ is to store priority jobs in a separate ZSET instead of inserting them in order in the wait list. This is much more elegant, faster and easier to maintain.

However, it requires a special mechanism to notify the workers when new prioritized jobs are available for processing. The mechanism uses special job IDs in the format 0:timestamp (that we call marker), which are sent to the wait list to signal that jobs are available to be processed, just not in the wait list, but in the priority set.

By the way, the timestamp is 0 for prioritized jobs, but we used the same mechanism for delayed jobs, where the timestamp indicates when the next delayed job is to be processed. This allows the workers to idle until the right time. If a job producer adds a delayed job that needs earlier processing, we simply add a new marker with the updated timestamp.

stateDiagram-v2 [*] --> Prioritized: Add Prioritized Job [*] --> Wait: Add / Update Marker Prioritized --> Active Wait --> Active: BRPOPLPUSH Active --> Completed Active --> Failed Active --> [*]: Remove Marker

Handling Delayed Jobs

The marker mechanism is also excellent for managing rate-limited and delayed jobs in a more efficient and correct way. In older versions of BullMQ, we relied on a separate process that would notify workers of changes in the delayed set. This process ran on a separate class instance, with its own Redis™ connection, using global events to signal the availability of delayed jobs.

Using markers, we just need keep the marker with the next delayed job timestamp, and use that value to wait unless a new delayed job or non-delayed job is added to the queue that needs to be processed earlier, in that case just adding a new marker will wake up some worker to take care of it.

New Marker Mechanism in BullMQ v5

This mechanism works well, and BullMQ v4 extensively uses it. However, managing the markers can be cumbersome in some edge cases. In fact, in very busy setups, it can lead to temporarily having many markers in the active list, causing some undesired side effects.

So, in BullMQ v5, we have introduced a different mechanism for handling markers. We have eliminated the BRPOPLPUSH command all togeteher, allowing job movements from any state to active state exclusively using the "moveToActive" Lua script. This script picks the next job based on factors like priority, delay, rate limiting, and so on. Secondly, we introduced a separate ZSET for signaling to the workers that jobs are waiting to be processed.

stateDiagram-v2 [*] --> Prioritized: Add Prioritized Job [*] --> Delayed: Add Delayed Job [*] --> Wait: Add Standard Job [*] --> Marker: Add Marker Marker --> [*]: ZPOPMIN Marker Prioritized --> Active Wait --> Active Active --> Completed Active --> Failed

Workers idling issue a BZPOPMIN command on the new "marker" key. As soon as a marker is pushed into this ZSET, the worker wakes up and, depending on the timestamp, either calls the "moveToActive" script or issues a new BZPOPMIN command if the timestamp is in the future, indicating that no jobs are currently known to be processed before that timestamp (but new jobs could arrive anytime, hence the need for a BZPOPMIN command).

Waking up one worker at a time

The adopted approach implies that when a marker is posted to the marker ZSET, only one of the many workers waiting for jobs in a queue wakes up and attempts to process jobs. This could be suboptimal if several jobs are available for processing. For example, if multiple jobs are added in bulk.

Using a Redis™ STREAM instead of a ZSET could wake up all idling workers simultaneously. Then, the workers would compete to fetch the next job from the queue and start processing. This approach would benefit scenarios where jobs are added in batches. However, we chose an approach based on minimizing unnecessary Redis™ commands in general. For instance, using a STREAM would wake up all the workers even if only one job was added, resulting in unnecessary executions of the "moveToActive" script.

It's important to note that for busy queues with constant jobs, the markers mechanism will be barely needed, as the workers will not idle, ensuring jobs are processed as quickly as possible with the minimal number of Redis™ commands.

Migrating from v4 to v5

The new markers add a new key to the underlying Redis™ data structure, qualifying it as a breaking change. This is the only breaking change between v4 and v5, so it's the only feature you need to be concerned about when upgrading to v5.

We've designed the new mechanism to allow upgrading to BullMQ v5 even with existing queues using the legacy marker mechanism. Note that for the new mechanism to function correctly, all workers and queue instances must upgrade to v5. Otherwise, some workers might idle longer than necessary as they won't receive the markers.

Workers by default will not block for more than 10 seconds. So, even if an older worker version is still running, the maximum delay for processing an incoming job would be up to 10 seconds. This default can be overridden with the drainDelay option, potentially extending the delay if you have modified it.

Please refer to these migration hints if you are planing to migrate from an older BullMQ version.