One ready-made option that might help with this is twisted.internet.defer.DeferredSemaphore
. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.
A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.
Here's an example of using DeferredSemaphore
to run ten asynchronous operations, but to run at most three of them at once:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
also has explicit acquire
and release
methods, but the run
method is so convenient it's almost always what you want. It calls the acquire
method, which returns a Deferred
. To that first Deferred
, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns a Deferred
, then to that second Deferred
a callback is added which calls the release
method.
The synchronous case is handled as well, by immediately calling release
. Errors are also handled, by allowing them to propagate but making sure the necessary release
is done to leave the DeferredSemaphore
in a consistent state. The result of the function passed to run
(or the result of the Deferred
it returns) becomes the result of the Deferred
returned by run
.
Another possible approach might be based on DeferredQueue
and cooperate
. DeferredQueue
is mostly like a normal queue, but its get
method returns a Deferred
. If there happen to be no items in the queue at the time of the call, the Deferred
will not fire until an item is added.
Here's an example:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Note that the async
worker function is the same as the one from the first example. However, this time, there's also a worker
function which is explicitly pulling jobs out of the DeferredQueue
and processing them with async
(by adding async
as a callback to the Deferred
returned by get
). The worker
generator is driven by cooperate
, which iterates it once after each Deferred
it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.
This approach involves a bit more code than the DeferredSemaphore
approach, but has some benefits which may be interesting. First, cooperate
returns a CooperativeTask
instance which has useful methods like pause
, resume
, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On the DeferredQueue
side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code calling put
handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things with DeferredSemaphore
is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.