Priority routing/queuing with Celery

When you’re using the Celery task scheduler in your Python project, especially when scheduling lots of tasks, it can be useful to implement priority routing.

The most common use case is probably adding a high-priority queue adjacent to the default queue, but you could just as well invert this and add a low-priority queue. Or both.

Implementation is simple.

Make sure this setting is enabled (it should be by default):

CELERY_CREATE_MISSING_QUEUES = True

Configure the name of your non-default queue somewhere:

CELERY_PRIO_QUEUE = 'my-high-priority-queue'

Define task routes. There are two ways of doing this. I prefer setting the route on the task definition.

When using the @task decorator:

@task(queue=settings.CELERY_PRIO_QUEUE)

When using class based tasks:

class MyTask(Task):
    queue = settings.CELERY_PRIO_QUEUE

The other way is using a setting:

CELERY_ROUTES = {'myapp.tasks.mytask': {'queue': CELERY_PRIO_QUEUE}}

Now, this is important. These routes will only be used when your tasks are being executed asynchronously (e.g. using apply_async). Periodic tasks will still be scheduled on the default queue. This “feature” has cost me a few hours of debugging.

For your periodic tasks (when using celerybeat), don’t forget to define queues for those as well:

CELERYBEAT_SCHEDULE = {
    'my-task': {
        'task': 'myapp.tasks.mytask',
        'schedule': crontab(minute='*/10'),
        'options': {'queue': CELERY_PRIO_QUEUE}
}

The final step is consuming your additional queue(s). A celery worker can consume one or more queues like this:

celery worker -A myproject -Q queue1
celery worker -A myproject -Q queue1,queue2

All set. Happy scheduling!

Leave a Reply

Your email address will not be published. Required fields are marked *