Celery for Twisted: manage Celery tasks from twisted using the Deferred API
Celery is an outstanding choice for dispatching short-lived, computationally-expensive tasks to a distributed backend system. Note the emphasis; Celery is ill-suited for tasks tasks that require updating some in-memory representation with out-of-process data. If you want a specific process to read data from standard input, for instance, good luck...
Twisted can be though of as having the opposite problem. Twisted is very good at maintaining and updating in-memory representations over extended periods of time, but fails miserably at performing expensive computations. Twisted notably has no built-in constructs for managing distributed task queues.
As its name suggests, txCelery elegantly couples these two frameworks together, and in so doing allows them to compliment each other. Developers can now create long-running processes whose expensive subroutines can be farmed out to a distributed computational cluster.
And best of all, txCelery fully leverages Twisted's Deferred API, so there's no need to drink yet another framework's Koolaid.
Note: These instructions assume you have a working installation of Celery.
The recommended way of installing txCelery is through pip
. PyPI will contain the latest stable version of txCelery.
First, install pip. On Debian/Ubuntu systems, this is achieved with the sudo apt-get install python-pip
command.
Next, let's install the latest stable version of txCelery:
pip install txCelery --user
to install for your usersudo pip install txCelery
to install system-wide
The latest development files can be obtained by cloning the github repo, checking out the dev
branch, and running python setup.py develop --user
. It is strongly recommended that you do not use the development version in production.
txCelery's API is so simple it brings tears to our eyes. There are exactly one and a half constructs. Yes, one and one half.
In order to use a Celery task with Twisted, you must wrap your Celery task with a DeferrableTask
-class decorator. In your tasks.py
(or wherever you keep your Celery tasks):
from celery import Celery
from txcelery.defer import DeferrableTask
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@DeferrableTask
@app.task
def my_task(*args, **kw):
# do something
There's just one thing to bear in mind: contrary to the Celery documentation's insistance that @app.task
be the top-most decorator in your function definition, DeferrableTask
expects to wrap a celery task and will throw a TypeError
if it receives anything else.
Once you've wrapped your task with the DeferrableTask
-class decorator, you'll find all the usual task methods like delay
, apply_async
, subtask
, chain
, etc. The difference is that those which used to return a celery.result.AsyncResult
will now return a twisted.internet.defer.Deferred
instance when they are called (ok, actually a subclass of Deferred
, but more on that in a second).
So what of this subclass of Twisted's Deferred
? It can be thought of as a Deferred
that also gives transparent access to all the attributes and methods of it's associated AsyncResult
instance. It can be thought of in those terms because that's exactly what it is, and that's why this part of the API only constitutes half of a thing to learn.
Our subclass is called DeferredTask
, it lives in txcelery.defer
and as far as Twisted is concerned it's just a plain old Deferred
. DeferredTasks can be chained, passed to maybeDeferred
, joined via gatherResults
and DeferredList
, etc.
DeferredTask
monitors the state of the task and fires with a callback if the task succeeds, or with an errback if the task fails. If the task is revoked, DeferredTask
fires with an errback containing a twisted.defer.CancelledError
as it's Failure
value.
- Wrap a task with a
DeferrableTask
- Call task methods and obtain a
DeferredTask
instance in lieu of anAsyncResult
- Use
DeferredTask
as if it were a regularDeferred
or a regularAsyncResult
And that's really all there is to it.