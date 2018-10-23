Background I use Twisted and Celery daily at work, both are useful frameworks, both have a lot of great information out there, but a particular use (that I haven’t seen discussed much online, hence this post) is calling Celery tasks from Twisted (and subsequently using the result). The difference between Twisted and Celery seems to be a frequent question people have (check out the number of questions on StackOverflow). The main difference, from my point of view, is that Twisted is a “batteries included” networking framework that is asynchronous / evented for handling of I/O, Celery is a distributed task queue which excels at short CPU-bound tasks where the asynchronous nature comes from running multiple processes. The txCelery project has a nice summary on their page: Celery is an outstanding choice for dispatching short-lived, computationally-expensive tasks to a distributed backend system. Note the emphasis; Celery is ill-suited for tasks tasks that require updating some in-memory representation with out-of-process data. If you want a specific process to read data from standard input, for instance, good luck… Twisted can be though of as having the opposite problem. Twisted is very good at maintaining and updating in-memory representations over extended periods of time, but fails miserably at performing expensive computations. Twisted notably has no built-in constructs for managing distributed task queues. The two main ways to interact with Celery are: Call a task (and have it happen asynchronously). Get the result of that task. In order to call Celery from Twisted we’ll need to ensure that both of these can happen in a way that matches Twisted’s execution model. This might be illustrated best by a code example, we would want to do something like the following: from celery import Celery from twisted.internet import defer , task app = Celery ( broker = 'amqp://guest:guest@127.0.0.1:5672//' , backend = 'rpc' ) @app.task def my_task ( a , b ): # Normally you'd do a CPU bound task here. return a + b @defer.inlineCallbacks def main (): # Normally you'd open some listening sockets or some outbound # connections here. # Call into Celery! result = yield my_task . delay ( 1 , 2 ) # Should print '1 + 2 = 3'. print ( "{} + {} = {}" . format ( 1 , 2 , result )) if __name__ == '__main__' : task . react ( main )

Initial Approach The initial attempt to pair these libraries is to call Celery tasks like you would normally (import the Celery task and call delay() on it). There’s a hint that this won’t work in the documentation however: Apply tasks asynchronously by sending a message. The documentation around calling AsyncResult.get() , to retrieve the result also hints about issues: Wait until task is ready, and return its result. It isn’t incredibly clear from the documentation, but this points to two separate issues with calling Celery from Twisted: In both of these situations, (synchronous) I/O is happening in the background when you make this call! I/O happening outside of the reactor is a big no-no in a Twisted process since it can block the reactor from running. Additionally, the AsyncResult.get() call blocks until a result is ready, this doesn’t fit well into an evented programming paradigm. The second problem is handled well by the txCelery package, it allows you to call a task and get a sub-class of Deferred which resolves into the result of the task call by periodically monitoring the status of the result. Unfortunately txCelery doesn’t solve the first issue (to my knowledge) since it just uses the normal mechanisms built into Celery for I/O (which causes I/O in the reactor thread).

Threaded Approach Twisted has a thread pool and makes it super easy to ask it to “run this code in a non-reactor thread and return the result to me”. It is pretty straightforward to cobble together a way to use threads.deferToThread() to call Celery.send_task() . You still need an asynchronous way to check if the task result is ready, however. You could just call AsyncResult.get() in a thread, but you will likely quickly exhaust your thread pool since that blocks until a result is ready. Alternately you can check the status of a task using the state property of an AsyncResult . (I found it very surprising that accessing a property of this object causes I/O to happen, but it does.) Connecting these ideas together we came up with something similar to the following (note that this is heavily inspired by what txCelery does, but pushes all I/O onto a separate thread instead of doing it in the reactor thread): from celery import states from twisted.internet import defer , reactor , threads def send_task ( app , * args , ** kwargs ): # Call the real function via a background thread. return threads . deferToThread ( app . send_task , * args , ** kwargs ) . addCallback ( CeleryDeferred ) class CeleryDeferred ( defer . Deferred , object ): """ Extension of the Twisted Deferred object that wraps a Celery AsyncResult. This Deferred will occasionally poll the Celery task for its status. When complete, the Deferred will resolve and any added callbacks will be run. Inspired by txCelery: https://github.com/SentimensRG/txCelery/ """ CHECK_INTERVAL = 0.25 def __init__ ( self , async_result ): # Deferred is an old-style class defer . Deferred . __init__ ( self , self . _canceller ) self . async_result = async_result # Start the monitor loop self . check_state () def check_state ( self ): """Check the status of the celery task on another thread.""" threads . deferToThread ( self . get_state ) . addCallbacks ( self . state_received , self . errback ) def get_state ( self ): """Check the status of the celery task directly""" return self . async_result . state def state_received ( self , celery_state ): """Called when the check_state thread finishes""" if celery_state in states . UNREADY_STATES : # Schedule another status check, to be run later. reactor . callLater ( self . CHECK_INTERVAL , self . check_state ) elif celery_state == states . SUCCESS : self . callback ( self . async_result . result ) elif celery_state == states . FAILURE : # This will contain the Exception instance if the task raised one # http://docs.celeryproject.org/en/latest/reference/celery.result.html self . errback ( self . async_result . result ) elif celery_state == states . REVOKED : self . errback ( defer . CancelledError ( 'Task {0}' . format ( self . async_result . id ))) else : # An unknown state was returned. self . errback ( ValueError ( 'Unknown state: `{}`' . format ( celery_state ))) def _canceller ( self ): # Revoke the celery task self . async_result . revoke () The calling syntax of this isn’t as nice as the initial approach, but it is pretty close (the modified lines are highlighted): from celery import Celery from twisted.internet import defer , task from twistedcelery import send_task app = Celery ( broker = 'amqp://guest:guest@127.0.0.1:5672//' , backend = 'rpc' ) @app.task def my_task ( a , b ): # Normally you'd do a CPU bound task here. return a + b @defer.inlineCallbacks def main (): # Normally you'd open some listening sockets or some outbound # connections here. # Call into Celery! result = yield send_task ( app , 'my_task' , args = ( 1 , 2 )) # Should print '1 + 2 = 3'. print ( "{} + {} = {}" . format ( 1 , 2 , result )) if __name__ == '__main__' : task . react ( main )