10-08-2025
This year I created streaQ, a fast, fully-typed, distributed task queue for asynchronous Python jobs, building on Redis Streams for speed and fault-tolerance. In the process, I had to make several interesting architectural decisions, which I've decided to share here. After a quick overview of how streaQ is used, we'll take a look at the underlying architecture and some of the trade-offs made.
You can install streaQ like so:
$ pip install streaq
To get started, create a Worker object and point it to your Redis instance:
from streaq import Worker
worker = Worker(redis_url="redis://localhost:6379")
Tasks can now be registered to the worker like this:
import asyncio
@worker.task()
async def sleeper(time: int) -> int:
await asyncio.sleep(time)
return time
streaQ also supports cron jobs which run at regular intervals:
@worker.cron("* * * * mon-fri") # every minute on weekdays
async def cronjob() -> None:
print("Nobody respects the spammish repetition!")
While cron jobs will be enqueued automatically by running workers, normal tasks must be enqueued manually:
async with worker:
await sleeper.enqueue(3)
# enqueue returns a task object that can be used to get results/info
task = await sleeper.enqueue(1).start(delay=3)
print(await task.info())
print(await task.result(timeout=5))
That's it! To run it, let's put the code in a file like example.py and spin up a worker:
$ streaq example.worker
and simultaneously queue up some tasks:
$ python example.py
If everything is installed correctly, you should see outputs like this:
[INFO] 2025-09-23 02:14:30: starting worker 3265311d for 2 functions
[INFO] 2025-09-23 02:14:35: task sleeper □ cf0c55387a214320bd23e8987283a562 → worker 3265311d
[INFO] 2025-09-23 02:14:38: task sleeper ■ cf0c55387a214320bd23e8987283a562 ← 3
[INFO] 2025-09-23 02:14:40: task sleeper □ 1de3f192ee4a40d4884ebf303874681c → worker 3265311d
[INFO] 2025-09-23 02:14:41: task sleeper ■ 1de3f192ee4a40d4884ebf303874681c ← 1
[INFO] 2025-09-23 02:15:00: task cronjob □ 2a4b864e5ecd4fc99979a92f5db3a6e0 → worker 3265311d
Nobody respects the spammish repetition!
[INFO] 2025-09-23 02:15:00: task cronjob ■ 2a4b864e5ecd4fc99979a92f5db3a6e0 ← None
TaskInfo(fn_name='sleeper', enqueue_time=1751508876961, tries=0, scheduled=datetime.datetime(2025, 7, 3, 2, 14, 39, 961000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
TaskResult(fn_name='sleeper', enqueue_time=1751508876961, success=True, result=1, start_time=1751508880500, finish_time=1751508881503, tries=1, worker_id='ca5bd9eb')
If you want to learn more about streaQ's features and API, check out the documentation!
Fundamental to streaQ's design is Redis Streams, a data structure that stores ordered messages, each with an ID and key-value fields. A stream allows producers to add entries and consumers to read them in a sequential fashion. Thanks to the "consumer groups" feature, multiple consumers can share processing responsibilities without duplicating work, as each message is delivered to exactly one consumer.
Once a message has been delivered to a consumer, Redis maintains a list of pending messages—that is, messages that have been delivered but not yet acknowledged. Consumers explicitly acknowledge messages once processed, and Redis then removes them from the list. If a consumer crashes, other consumers can inspect pending messages and reclaim them for reprocessing. This model enables horizontal scaling, fault tolerance, and ordered, at-least-once message delivery—making streams with consumer groups ideal for distributed task processing.
Let's see how this maps onto a task-queuing library like streaQ:
Another nice feature of Redis Streams is the ability to do blocking reads. A naive way to read a queue is to poll it at regular intervals to see if there are queued messages. However, this comes with a performance penalty and means messages will on average wait a bit before getting picked up. XREADGROUP with BLOCK avoids these problems nicely as the command will not return until messages arrive, and upon arrival will return immediately.
So the basic queue layout is as follows:
XREADGROUP with BLOCK)XREADGROUP returns the added messages in the workerAt any point, tasks can be added to the stream with XADD. If at some point during step #4 the worker shuts down unexpectedly, the task can eventually be retried via XAUTOCLAIM, and as tasks finish workers mark them as completed with XACK and XDEL.
Great, now we have the main queue sketched out! But what if we want to defer a task's execution to a later time? For that, we can use a different Redis structure: a sorted set. In streaQ, the deferred queue is simply a mapping of task ID to its scheduled execution timestamp in milliseconds. Tasks can be scheduled with ZADD, and we can check which tasks are ready to run by calling ZRANGEBYSCORE with the current timestamp. Finally, we add those tasks to the main queue and they'll get picked up by a worker.
Adding this to the previous layout, we get:
XREADGROUP with BLOCK 500)XREADGROUP returns the added messages in the worker, or nothing if it timed outZRANGEBYSCORE)XAUTOCLAIM)The idea behind the timeout is to be able to do all of these steps in the worker's main loop. That way, even if no new tasks are added, we can still schedule delayed tasks, reclaim stale tasks, and so on. The 500ms timeout is based on streaQ's support of cron jobs which run as frequently as once per second, but could vary.
At this point, we have a functional, robust queue architecture. Let's implement a couple other features to flesh it out, starting with task abortion. To coordinate abortion, we can create a set in Redis containing the IDs of aborted tasks, and to abort a task we simply add its ID to the set via SADD. Every loop iteration, each worker can fetch the set with SMEMBERS and cancel any tasks present on the worker.
Next, let's make it so each worker gets passed a value for concurrency, the maximum number of tasks it can run at once. Additionally, we'll allow workers to "pre-fetch" a number of tasks beyond its max concurrency which will be queued locally. This minimizes round trips to Redis and increases efficiency a bit. In order for this to work we'll need to keep track of and limit the number of running tasks, which a semaphore does nicely.
Here's our final queue design:
N: concurrency + prefetch - running - queued_locallyN == 0, try up to 0.5s to acquire the semaphore; if acquired, recalculate NN > 1, worker checks for stale tasks to reclaim (XAUTOCLAIM with COUNT N). If any tasks were reclaimed, reduce N by that amount.N > 1 still, worker listens to the stream with (0.5 - elapsed_time) second timeout (XREADGROUP with BLOCK ? and COUNT N)XREADGROUP returns the added messages in the worker, or nothing if it timed outZRANGEBYSCORE)SMEMBERS)Take a look at streaQ's main loop implementation. You should see that it follows this pattern pretty closely! The main differences are the usage of Lua scripts and pipelining to streamline some of the work, as well as support for different priority queues which essentially repeats many of these steps for each queue used.