Redis is a very popular in-memory database. In the early days started as a contender of memcache, but soon as it got more features it started
to be useful in many other domains, including session management, (more examples here), etc.
In this blog post we try to answer the question, if Redis is any good for queue management. So let's start quickly by summarizing some important
features we need for a good queue system:
On top of that, we need to be able to scale the queue to multiple machines, being able to distribute the load and
being able to recover from a machine failure.
Lets see if Redis is a good fit for this.
Deep inside Redis documentation there is a command called ()BRPOPLPUSH
)[https://redis.io/docs/latest/commands/brpoplpush/] (Blocking Left Pop Right Push). This command is a blocking version of RPOPLPUSH
(Right Pop Left Push). What this commands does is moving one element from one list to another. The reason this command is useful for queueing is because it implements an atomic
operation for moving one element from one list to another while at the same time returning the element that was moved. For implementing a queue this is actually quite convenient, if you represent
the queue as a list, and then the active state of the queue as another list, then you can robustly implement a queue where you just move elements from the queue to the active state, process them and then remove them from the active state.
Something like this:
const Redis = require("ioredis");
const redis = new Redis();
async function processQueue(queueKey, processingKey, handler) {
try {
while (true) {
// Move item from queue to processing list and process it
const item = await redis.brpoplpush(queueKey, processingKey, 0);
try {
await handler(item);
// Remove from processing after successful handling
await redis.lrem(processingKey, 1, item);
} catch (err) {
console.error(`Error processing ${item}:`, err);
}
}
} catch (err) {
console.error("Queue processing error:", err);
}
}
// Usage
processQueue("myQueue", "myProcessing", async (item) => {
console.log("Processing item:", item);
// Simulate processing time
await new Promise((resolve) => setTimeout(resolve, 1000));
});
In this example we have a processQueue
function that takes a queueKey
and a processingKey
and a handler
function. The handler
function is called for each item in the queue. The processQueue
function is a loop that moves items from the queue to the processing list and then processes them. If the processing is successful, the item is removed from the processing list. If an error occurs, the item is left in the processing list and an error is logged.
As simple as this example may be, it ilustrates the inner workings of most queue systems, it is al about picking an item from a list that has some kind of order (in the example the order is the insertion order FIFO, but we could have a priority queue, a LIFO, etc).
With a test like this, even starndar Redis without any multicore capabilities would be able to process insane amounts of job, check this benchmark for example that I run in my local Apple M2 Pro computer:
However, this is actually not the fastest you could go. Note that in this example we are processing the jobs serially, in the sense that we pick one job and we do not pick the next until we have completed the first job. If the time it took to process jobs is large, it would be nice to be able to process other jobs in parallel. Now, note that in this particular example written in javascript, we must make the assumption that whatever the handler is doind that takes time, should be IO bound. If we had a very heavy CPU operation, like for example a for loop doing some very large calculation, then we would not be able to do any parallel processing as NodeJS event loop would be so busy that no other things would be able to run. A solution for this could be
to use a separate worker thread to run every message handler, but this would be too complicated to cover in this post and not really needed to understand the basic concepts.
Now, check this new version of the simple processQueue function we implemented above, this time enabling concurrency:
const Redis = require("ioredis");
const redis = new Redis();
async function processQueue(queueKey, processingKey, handler, concurrency = 1) {
try {
const workers = Array.from({ length: concurrency }, () => worker());
async function worker() {
while (true) {
// Move item from queue to processing list and process it
const item = await redis.brpoplpush(queueKey, processingKey, 0);
try {
await handler(item);
// Remove from processing after successful handling
await redis.lrem(processingKey, 1, item);
} catch (err) {
console.error(`Error processing ${item}:`, err);
}
}
}
} catch (err) {
console.error("Queue processing error:", err);
}
}
Lets run the same benchmark as before, but this time with different concurrency levels: