Welcome, guest | Sign In | My Account | Store | Cart

This recipe presents a simple function for running several works concurrently with Twisted. A 'work' is an abstraction for an object that satisfies the IWorker interface presented in the code. An example of work is downloading a web page.

Python, 137 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from __future__ import nested_scopes

from zope.interface import Interface, Attribute
from twisted.internet import defer


class IWorker(Interface):
    """A worker is an object that can do some 'work' and return
    its result in a deferred.
    """

    deferred = Attribute("""A Deferred that is fired at work completion""")

    def startWork():
        """Start the work.
        """


def spawnDeferredWorkers(workerList, n):
    """Run several works concurrently.

    workerList is a list of objects that satisfies the IWorker interface.
    n specifies how many works to run concurrently.
    
    Return a list of Deferred objects, which will fire with the result of each work.
    """

    def callback(result):
        # start the next work
        try:
            worker = workerIterator.next()
            worker.startWork()
        except StopIteration:
            pass
        
        return result
        
    def errback(reason):
        # ignore the error and start the next work
        try:
            worker = workerIterator.next()
            worker.startWork()
        except StopIteration:
            pass
        
        return reason

    
    deferredList = []
    for worker in workerList:
        deferred = worker.deferred
        deferred.addCallback(callback).addErrback(errback)
           
        deferredList.append(deferred)
    
    workerIterator = iter(workerList)
    
    # begin the first n works
    for i in range(n):
        try:
            worker = workerIterator.next()
            worker.startWork()
        except StopIteration:
            pass

    return deferredList


def getRFC(n = 10, m = 3):
    """An example of spawnDeferredWorkers for getting a list a RFC.
    
    n specifies how many RFC (starting from 1) to download.
    m specifies how many downloads to run concurrently.
    """
    
    from zope.interface import implements
    from twisted.web import client
    from twisted.internet import reactor
    
    import time

    
    class HTTPClientWorker(client.HTTPDownloader):
        """
        Adaptation of client.HTTPDownloader to IWorker interface
        """
        implements(IWorker)

        def __init__(self, *args, **kwargs):
            client.HTTPDownloader.__init__(self, *args, **kwargs)

        def startWork(self):
            print "starting downloading %s..." % self.url
            reactor.connectTCP(self.host, self.port, self)


    # callback functions
    def gotPageList(result):
        pages = len(result)
        errors = len(filter(lambda item: not item[0], result))
        print "\n\ngot %d pages with %d errors" % (pages, errors)
        
        end = time.clock()
        print "%d secs elapsed" % (end - start)
        
        reactor.stop()

    def savePage(page, worker):
        print "got %s" % worker.url

    def error(reason, worker):
        print "failed to download %s" % worker.url
        print reason.value.__class__, reason.value
        
        return reason


    workerList = [HTTPClientWorker("http://www.ietf.org/rfc/rfc%d.txt" % rfc,
                                   "rfc%d.txt" % rfc)
                  for rfc in range(1, n + 1)]

    deferredList = spawnDeferredWorkers(workerList, m)
    for deferred, worker in zip(deferredList, workerList):
        deferred.addCallback(savePage, worker).addErrback(error, worker)

    deferred = defer.DeferredList(deferredList, consumeErrors = True)
    deferred.addCallback(gotPageList)

    start = time.clock()

    reactor.run()


if __name__ == "__main__":
    import sys
    
    getRFC(int(sys.argv[1]), int(sys.argv[2]))

This recipes requires Twisted (http://twistedmatrix.com) and the zope.interface package (http://www.zope.org/Products/ZopeInterface).

The algorithm used is very simple. The function begins to run the first n works, then when a works ends, the next one is started. I hope that the comments in the code and in the example are clear.

The test function uses twisted.web.client.HTTPDownloader as the 'worker' to download a list of RFC documents. Another possible example is to download the same file concurrently (as done by some download accelerators).

2 comments

Andrew Bennetts 18 years, 12 months ago  # | flag

Should use utilty functions in twisted.internet.defer. This recipe would benefit from using twisted.internet.defer.gatherResults or perhaps some other helper function in that module, rather than inventing its own. It would make this recipe much simpler.

Manlio Perillo (author) 18 years, 12 months ago  # | flag

I don't think so. runDeferredWorkers return a list of deferred. You can do whatever you want with it.