An introduction to Celery
Often times in a web application, there are a subset of tasks that are long-running, resource intensive and don’t have to be completed in real time. These tasks are perfect for offloading to an async
Overview
Queuing can be useful for a number of different reasons:
- It can improve the user experience by allowing the user to continue on to other things while a task completes, and to get results back once the task finishes
- It can improve resource utilization, by running a queue on dedicated hardware or rate-limiting requests so resources are never exhausted
- It can improve application server worker throughput; most application server workers are best used by turning over requests quickly. Long-running requests can overwhelm the request queue. A separate task queue can fix this.
- A task queue can be used to schedule tasks for different times
As with most things in life, there are also some downsides to queuing. First, getting results from a task is no longer as simple as having the results of a function returned. A task result for a queuing system is more similar to a promise of a result than an actual result. A task queue also adds infrastructure to your application’s deployment footprint, including workers and brokers. Finally, a task queue adds complexity to the mental model required to understand a project. Although it can be worth it to add such complexity, it’s definitely worth keeping in mind when implementing a queue.
What is Celery
Celery is a python library for creating tasks and interacting with a broker (such as RabbitMQ or SQS). It’s not a native queuing system by itself; rather, it makes the process of creating a queueing system easier for you. Celery is open source and released under the New BSD license, which means you can use it freely as long as you follow the relatively simple restrictions of the license.
Types of tasks and Celery
There are a number of different tasks that make themselves logical candidates for queuing with celery:
- Generating bulk emails (although the actual sending of the email is already a queue, in most cases)
- Creating thumbnail versions of uploaded images
- Converting video
- Generating a PDF
- Re-computing ratings (or other forms of database de-normalization)
- Spam filtering and/or checking (these APIs can be slow, and usually posts don’t have to show up in real-time)
- Retrying a failure asynchronously: a good example of this would be an external call where the service might have a temporary failure. You could queue a retry for some period later and increase the probability it would succeed.
- Scheduling: send an email 10 minutes after someone leaves a comment, leaving them the opportunity to edit it in a grace period
- Indexing: Updating a search index when source data changes
- External calls: unsubscribing, for instance. In my experience, these APIs can be notoriously unreliable and slow, so no reason to keep the user from proceeding until the call finishes
There are also a few types of tasks that one should avoid using a queuing mechanism for:
- Processing payment (if you’re blocking an order should payment fail)
- Address validation (should the order be blocked if validation fails)
- Replacing cron (if you don’t need too; although a task queue can function similar to scheduled tasks or cron, if you’re not using the more advanced features, there’s really not much benefit to replacing cron with a queue)
- As a multithreading mechanism: although you can use a queue to fire off a million records for sorting, it’s more efficient (less overhead) to use a true multithreading library for something like this
- Tasks that aren’t that slow or frequent: don’t incur the complexity involved in a queuing system if you don’t need to
- Logging: you want logging to be saved in-order, as things finish; queueing logging is a bad idea
- If you notice you fire off a task to the queue, then stall for it to finish, it’s probably a sign that the task you’re sending isn’t a great fit for a queue
Basic concepts
- Task queue: A set of tasks to be run. There can be multiple named queues defined in an application
- Task: Essentially a python function; a task that can be completed given a set of parameters
- Message: Essentially the parameters to a python function. Will also include any task-specific parameters like delay.
- Broker: A “database” of messages; where a message is sent when a task is called (SQS, for example)
- Worker: A daemon that runs and looks for messages; when it finds them, it runs tasks (celery worker)
As you can see from the diagram above, a user initiates a request to a particular web server. The web server then creates a message (calls for a task to be performed) on the broker. Periodically, celery workers poll the broker for incoming tasks and perform them when accepted. Results are then sent to the broker. The application server can then (optionally) get the results of the task from the broker.
Brokers
- RabbitMQ: The most supported broker for celery. Has additional support for monitoring. Stable. Specifically designed to be a message broker.
- Redis: An in-memory data store. Stable.
- MongoDB: A No-SQL database. Support is experimental.
- Beanstalk: An in-memory work queue service.
- Amazon SQS: A hosted message service. Support is experimental. I’ve personally used this with no issues in production.
- Couch DB: A No-SQL database. Support is experimental.
- Zookeeper: Designed by Apache for distributed coordination. Support is experimental.
- Django DB: Uses the Django ORM to interact with a database. Support is experimental.
- SQLAlchemy: Uses the SQLAlchemy ORM to interact with a database. Support is experimental.
- Iron MQ: A cloud-based message queue. They provide their own libraries for celery.
Calling a task
Now that we have the basic concepts in place around how celery and the general concepts work, let’s look at a real-world application of calling a task. Below is a typical Django admin action that calls a task (creates a message in a broker):
def do_something_that_takes_a_while(moduleadmin, request, queryset):
email = request.user.email
my_task.delay(queryset.values_list('id', flat=True), (email,))
messages.add_messages(
request,
messages.INFO,
"Your task results will be emails to {} when completed.".format(email)
}
do_something_that_takes_a_while.short_description = "Complete Task"
Pretty simple; we call the name of the python function that’s a task with the ‘delay’ method, and the task is fired off asynchronously.
Writing a task
As we mentioned earlier, writing a task is very similar to simply writing a python function; the only additional code involved is the “@task” decorator:
from celery import task
from django.core.mail import send_mail
from django.contrib import messages
@task
def my_task(object_ids, emails):
# TODO: do some long running task here
# next, send the email results to the user
send_mail(
'Your task has completed!',
"Task results here",
'from@example.org',
emails,
What it looks like when it’s running
To start our celery worker, we run the command “celery –A myproject worker –l info”. The worker will then initialize, start looking for tasks, and run any tasks that it finds:
The above screenshot shows celery running in a console, not daemonized; in production, you’d want to run the worker as a background daemon.
Best practices
Now that we’ve seen a simple task implemented and run, let’s review some best practices for any task that you write:
- Make your tasks idempotent when you can. It’s difficult to troubleshoot any issues that arise when tasks are state based, given that they are executed asynchronously.
- Avoid having one task depend on another task. Since a running task is already in the background and not blocking a user, it doesn’t often make sense to have one task call another instead of just performing the work directly. Tasks waiting on others to finish can cause contention and possibly deadlocks.
- Don’t forget that tasks still use resources. A web page that is slow and ram intensive, when converted into a task, will still take time to run and consume ram.
- Tasks are not always fire and forget; then can fail. When it’s critical that a task be completed, be sure to code in error scenarios and/or any retry mechanisms needed.
- Pass object ids instead of objects themselves. If you pass an instance of an object instead of an id, it may have become stale by the time the task has started running. It’s best to re-fetch objects from the database given an id.
- Try to avoid database brokers if you can. They don’t work as well as brokers specifically designed to function as message brokers.
- Be sure to test with CELERY_ALWAYS_EAGER=True. This forces the task to be dispatched and completed in a synchronous call, which lets you know right away if there are issues with your tasks.
- Be sure to test with CELERY_ALWAYS_EAGER=False. This seems to contradict the previous best practice, but the reality is you should test with both; you want to make sure your tasks are working (and errors aren’t hidden by them being in the background); you also want to make sure the tasks are completed successfully when they are handed off to the background queue.
Advanced concepts
Now that we’ve gone over best practices, let’s take a look at some of the neat features and tools that celery offers:
Task Retries
# manually retry a task
@app.task(bind=True)
def my_task(self, object_ids, emails):
try:
raise Exception("Something Bad Happened")
except Exception, e:
raise self.retry(exc=e)
Queues
Tasks can be assigned to different queue names, and workers can be assigned to run against a certain queue. This can be useful if you want to have dedicated hardware (stronger GPU, perhaps) for thumbnail processing (and a queue for that), and a separate queue for things like generating emails.
Rate limiting (per worker)
# rate limiting
@app.task(bind=True, rate_limit=3)
def my_task(self, object_ids, emails):
pass
Scheduling
# scheduling
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
my_task.apply_async(*args, eta=tomorrow)
my_task.apply_async(*args, expires=tomorrow)
my_task.apply_async(*args, countdown=5)
Task prioritization
As mentioned above, you can create different named queues for tasks to be run in. Using this feature, you can create a high priority queue and a low priority queue to assign task priorities. Unfortunately, a native approach like assigned an integer priority value is not supported by Celery.
Managing groups of tasks
# calling multiple tasks
group([my_task.s(*args), my_task.s(*args)])() #executes in parallel
chain(my_task.s(*args), my_task.s(*args))() #executes in serial
# takes a group of tasks, calls them, then calls a callback on the result
res = chord([my_task.s(*args), my_task.s(*args)])(my_task.s())
res.get()
Conclusion
There are a variety of uses that a queuing library like Celery can offer; it can help improve the user experience, get better resource utilization out of your servers, and handle scheduling issues much more cleanly than a traditional scheduling mechanism like cron. Although there can be some drawbacks to using a queueing mechanism, for the most part it’s relatively simple to implement and understand, and applications and being reaping the benefits with little to no effort on their part.
The JBS Quick Launch Lab
Free Qualified Assessment
Quantify what it will take to implement your next big idea!
Our assessment session will deliver tangible timelines, costs, high-level requirements, and recommend architectures that will work best. Let JBS prove to you and your team why over 24 years of experience matters.