This recipe presents a simple function for running several works concurrently with Twisted. A 'work' is an abstraction for an object that satisfies the IWorker interface presented in the code. An example of work is downloading a web page.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | from __future__ import nested_scopes
from zope.interface import Interface, Attribute
from twisted.internet import defer
class IWorker(Interface):
"""A worker is an object that can do some 'work' and return
its result in a deferred.
"""
deferred = Attribute("""A Deferred that is fired at work completion""")
def startWork():
"""Start the work.
"""
def spawnDeferredWorkers(workerList, n):
"""Run several works concurrently.
workerList is a list of objects that satisfies the IWorker interface.
n specifies how many works to run concurrently.
Return a list of Deferred objects, which will fire with the result of each work.
"""
def callback(result):
# start the next work
try:
worker = workerIterator.next()
worker.startWork()
except StopIteration:
pass
return result
def errback(reason):
# ignore the error and start the next work
try:
worker = workerIterator.next()
worker.startWork()
except StopIteration:
pass
return reason
deferredList = []
for worker in workerList:
deferred = worker.deferred
deferred.addCallback(callback).addErrback(errback)
deferredList.append(deferred)
workerIterator = iter(workerList)
# begin the first n works
for i in range(n):
try:
worker = workerIterator.next()
worker.startWork()
except StopIteration:
pass
return deferredList
def getRFC(n = 10, m = 3):
"""An example of spawnDeferredWorkers for getting a list a RFC.
n specifies how many RFC (starting from 1) to download.
m specifies how many downloads to run concurrently.
"""
from zope.interface import implements
from twisted.web import client
from twisted.internet import reactor
import time
class HTTPClientWorker(client.HTTPDownloader):
"""
Adaptation of client.HTTPDownloader to IWorker interface
"""
implements(IWorker)
def __init__(self, *args, **kwargs):
client.HTTPDownloader.__init__(self, *args, **kwargs)
def startWork(self):
print "starting downloading %s..." % self.url
reactor.connectTCP(self.host, self.port, self)
# callback functions
def gotPageList(result):
pages = len(result)
errors = len(filter(lambda item: not item[0], result))
print "\n\ngot %d pages with %d errors" % (pages, errors)
end = time.clock()
print "%d secs elapsed" % (end - start)
reactor.stop()
def savePage(page, worker):
print "got %s" % worker.url
def error(reason, worker):
print "failed to download %s" % worker.url
print reason.value.__class__, reason.value
return reason
workerList = [HTTPClientWorker("http://www.ietf.org/rfc/rfc%d.txt" % rfc,
"rfc%d.txt" % rfc)
for rfc in range(1, n + 1)]
deferredList = spawnDeferredWorkers(workerList, m)
for deferred, worker in zip(deferredList, workerList):
deferred.addCallback(savePage, worker).addErrback(error, worker)
deferred = defer.DeferredList(deferredList, consumeErrors = True)
deferred.addCallback(gotPageList)
start = time.clock()
reactor.run()
if __name__ == "__main__":
import sys
getRFC(int(sys.argv[1]), int(sys.argv[2]))
|
This recipes requires Twisted (http://twistedmatrix.com) and the zope.interface package (http://www.zope.org/Products/ZopeInterface).
The algorithm used is very simple. The function begins to run the first n works, then when a works ends, the next one is started. I hope that the comments in the code and in the example are clear.
The test function uses twisted.web.client.HTTPDownloader as the 'worker' to download a list of RFC documents. Another possible example is to download the same file concurrently (as done by some download accelerators).
Should use utilty functions in twisted.internet.defer. This recipe would benefit from using twisted.internet.defer.gatherResults or perhaps some other helper function in that module, rather than inventing its own. It would make this recipe much simpler.
I don't think so. runDeferredWorkers return a list of deferred. You can do whatever you want with it.