A critical component of the Nextdoor software stack is our distributed task queueing system which processes millions of asynchronous tasks daily. Examples of these tasks include sending content notifications to millions of neighbors across the country, building search indexes, and other time-consuming processes that should be decoupled from our interactive web and mobile apps (iOS and Android).
There are two pieces of our task system: a message broker (queue) and a set of task worker processes. Like many others, we used RabbitMQ as our message broker. Since we use Python/Django for much of our application, we selected Celery for our task workers. These popular open-source projects gave us a lot of leverage when we were small; however, we began to suffer from stability issues with Celery recently as we were gaining traction and growing our user base, which led to a lot of firefighting. Despite plenty of advice from other Celery users, including our friends at Instagram, and great support from Ask Solem himself (the author of Celery), we continued to encounter issues.
To reduce our operational overhead, we decided to swap out our self-hosted RabbitMQ cluster with Amazon SQS. SQS is simple to understand, highly scalable, and fully managed by Amazon. However, stabilizing Celery proved to be a much more daunting task given the complexity of the codebase — so much so that we decided to completely replace Celery with our own homegrown project, simply called Taskworker.
Our design decisions for Taskworker were guided by comprehensive data analysis on our production logs, micro-benchmarking, and production load simulation.
We faced three major problems with our use of Celery.
Celery workers proved to be unstable at our scale. Workers would often mysteriously hang and were difficult to troubleshoot due to the complexity of the Celery codebase. Furthermore, SQS support of Celery was classified as experimental and had not yet been blessed for production use. We were seeing quite a few duplicate messages in our benchmark with Celery+SQS.
The compute resource utilization of Celery workers was inefficient. Many of our worker nodes were either underutilized or overloaded, since Celery doesn’t support priority queues. We attempted to mimic a priority queuing system by statically assigning worker nodes to queues designated by task priority, and autoscaling in workers based on queue depth. This proved to be inefficient.
The latency for Celery workers to process a task was often very high. This was ultimately a manifestation of the previous two problems combined (i.e., unstable workers and inefficient resource utilization). When worker nodes would hang, they weren’t available to process tasks and our queues would eventually back up.
Motivated by the above three problems, we set three goals for Taskworker:
Simple. The system needs to be easy to reason about and to troubleshoot by using simple abstractions and limiting functionality to only that which we needed.
Efficient. The system should make as efficient use of compute resources as possible.
Scalable. The system should be fully distributed and horizontally scalable. The only operation for scaling up is to boot up additional machines. Again, simple!
With these goals in mind, we came up with a design for a task manager that was simple enough to express in the following block of Python pseudocode (omitting error handling and retry logic):
queue = select_queue()
tasks = queue.get_tasks()
for task in tasks:
Under the hood, we run a set of Taskworker processes on each worker node with each process running a loop as shown above. All processes are totally independent from one another and autonomously make decisions on which queue to service by calling the select_queue() function, which will be described in detail in the next section.
Autonomous worker allocation
Taskworker pulls batches of tasks from SQS queues. The select_queue() function makes a local decision on which queue it should get tasks from, based on a numeric priority value of queues. We prefer to process tasks from higher priority queues, but we also need to avoid starving lower priority ones.
We benchmarked a dozen different algorithms by simulating production workloads and ended up with a variant of a lottery algorithm, which we’ve called “Reduced Lottery”, shown as follows:
candidate_queues = get_all_queues()
while not candidate_queues.empty():
queue = run_lottery(candidate_queues)
The Reduced Lottery algorithm met our goal: it prefers higher priority queues, while avoiding starving lower priority queues by introducing randomness. And it’s simple!
Put tasks in the right queue
We manage a dozen or more different classes of queues. Each queue hosts tasks with the same priority (determined by business importance) and similar running time (based on their run history). We configure settings at the queue level, including priority, SQS visibility timeout, and batch size to fetch tasks in one task processing loop.
SQS follows an “at-least-once” semantic when delivering tasks to workers, which forces our tasks to be idempotent in case they’re potentially delivered to multiple workers. We decided to shift the responsibility of ensuring idempotence to developers of tasks, per the end-to-end principle: applications (i.e., tasks) know better than the infrastructure (i.e., Taskworker) how to guarantee task idempotence. This also keeps the underlying infrastructure very simple.
SQS queues and Taskworkers are always versioned together for compatibility. This ensures that tasks produced by a particular version of code will always be consumed by workers running the same version of code. This also allows multiple versions of code to be run simultaneously and safely, as shown in the following diagram illustrating the release process of Taskworker.
The release engineer simply brings up a server farm of Taskworker nodes with a specific version of code. Done!
We simulated production workloads with Taskworker to decide how to provision worker capacity at different periods throughout the day. Because we have a fairly predictable workload, we simply set the auto-scaling rules to be based on time of day, which works quite well for us. It turns out, our load simulation for capacity planning was fairly accurate when deployed to production.
Our next goal was to safely migrate over 100+ different types of tasks in production from the old Celery/RabbitMQ system to the new Taskworker/SQS system.
When we started to design Taskworker, we intentionally tried to make the client interface as similar as possible to Celery’s. This allowed us to minimize the number of changes to be applied to all tasks. We added our homegrown feature switches on a per-task basis to decide whether to publish tasks to RabbitMQ or SQS.
When we started to migrate tasks, we simply flipped on feature switches for tasks one by one — starting with lower priority tasks first. We closely monitored all production metrics during this migration process to ensure a smooth transition, and ultimately it took us 20 days to complete. The following figure shows the progress of task migration, where yellow represents tasks running on the old Celery/RabbitMQ system and green represents tasks running on the new Taskworker/SQS system.
We’ve been running Taskworker in production for over three months and have been extremely happy with the results. We’ve encountered no stability issues with hung workers or crashes. Running the same number of worker nodes as the previous Celery system, the average task-in-queue latency of the Taskworker system improved by 40x during busy hours, thanks to the true priority implementation of it that makes resource utilization efficient.
There are still many potential improvements for Taskworker: a more efficient select_queue() algorithm, non-blocking task processing in the loop, dynamic priorities for queues, and more.
We are in the process of open sourcing Taskworker. Stay tuned! If you’re interested in working on these kinds of problems and other interesting infrastructure challenges, we’re hiring!