Welcome, guest | Sign In | My Account | Store | Cart

processing.Pool (http://pypi.python.org/pypi/processing) is a nice tool to "parallelize" map() on multiple CPUs. However, imagine you have X threads which send the same request Pool.map(getNthPrimeNumber, [100000, 10000000, 10000]) at (almost) the same time. Obviously, you don't want to compute X times getNthPrimeNumber for 100000, 10000000, 10000... unless you have 3.X processors available. You would like one thread to submit the 3 requests, and then the X-1 others would notice that the requests have already been submitted and will then just wait for the result. This is what this code is about: a kind of "trensient memoize" for processing.Pool::imap().

Python, 215 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
## @brief A simple variant of processing.Pool that accepts requests
#  from different threads.


# Import 'multiprocessing' package (formerly known as 'processing'):
try:
    # Tested with python 2.6 b3
    from multiprocessing import Pool
except ImportError:
    # Tested with standalone processing 0.52
    from processing import Pool

import threading, sys


class MultiThreadedPool:
    """
    A simple variant of processing.Pool that accepts requests
    from different threads: makes sure that requests being processed by
    the worker processes are not redundant.

    When a thread B submits a request which is already being processed
    in the background for another thread A, then B doesn't re-submit the
    request: it waits for the same result object as A.

    This package makes the following asumption:
    - the result of the function to call is entirely determined by its
      arguments (resp. "function", "params")

    As a consequence, in order to determine whether a "request" has
    already been submitted by another thread, we ONLY compare the
    couples (function, params). If a submitted request has the same
    couple (function, params) as a request in progress, then we wait
    for this request to be completed (valid result, or exception)

    This Pool should be safe wrt exceptions in the remote function.
    Only the map() and imap() methods are implemented.
    """
    __lock      = None # threading.Lock object
    __inflight  = None # dict: (function, params) -> processing.ApplyResult obj
    __workers   = None # processing.Pool object

    def __init__(self, processes=None, initializer=None, initargs=()):
        """See processing.Pool.__init__()"""
        self.__workers  = Pool(processes, initializer, initargs)
        self.__inflight = dict()
        self.__lock     = threading.Lock()

        # Apply locking decorator on close/terminate/join
        self._unregister_jobs = self.__make_synchronized(self._unregister_jobs)
        self.close     = self.__make_synchronized(self.__workers.close)
        self.terminate = self.__make_synchronized(self.__workers.terminate)
        self.join      = self.__make_synchronized(self.__workers.join)

    def apply(self, func, args=()):
        """Equivalent to processing.Pool::apply(), but without the kwds{}
        argument"""
        self.__lock.acquire()
        try:
            key, job, i_am_owner = self._apply_async(func, args)
        finally:
            self.__lock.release()

        # Wait for result
        try:
            return job.get()
        finally:
            self._unregister_jobs([(key, job, i_am_owner)])

    def imap(self, func, iterable):
        """Equivalent to processing.Pool.imap(), but without the
        "chunksize" argument"""
        jobs = [] # list of tuples (key, result_object, bool_i_started_the_job)
        # Build the list of jobs started in the background
        self.__lock.acquire()
        try:
            for param in iterable:
                jobs.append(self._apply_async(func, (param,)))
        finally:
            self.__lock.release()

        # Wait for everybody
        try:
            for key, job, i_am_owner in jobs:
                yield job.get()
        finally:
            self._unregister_jobs(jobs)

    def map(self, func, iterable):
        """Equivalent to processing.Pool.map(), but without the
        "chunksize" argument"""
        return list(self.imap(func, iterable))

    def _apply_async(self, func, args):
        """Return a tuple (inflight_key, applyResult object, i_am_owner)"""
        key = (func, args)
        try:
            # Job already started by somebody else
            job = self.__inflight[key]
            return key, job, False
        except KeyError:
            # We have to start a new job
            job = self.__workers.apply_async(func, args)
            self.__inflight[key] = job
            return key, job, True

    def _unregister_jobs(self, jobs):
        """
        Remove all the given "in flight" jobs.
        Due to a limitation of processing 0.52, we have to wake up
        additional threads waiting for the result by hand. The correct
        fix to processing would be to replace self._cond.notify() in
        ApplyResult::set() by self._cond.notifyAll()
        """
        for key, job, i_am_owner in jobs:
            # Begin workaround
            # processing.ApplyResult._set doesn't call notifyAll !
            # we have to do it ourselves.
            # Don't move it: nothing guarantees
            # that the owner will be the 1st to wake up !
            job._cond.acquire()
            job._cond.notifyAll()
            job._cond.release()
            # End workaround

            if not i_am_owner:
                # Don't remove it from the in_flight list
                continue
            try:
                del self.__inflight[key]
            except KeyError:
                print >>sys.stderr, "Warning: job not in queue", key, job

    def __make_synchronized(self, f):
        """Local decorator to make a method calling lock acquire/release"""
        def newf(*args, **kw):
            self.__lock.acquire()
            try:
                return f(*args, **kw)
            finally:
                self.__lock.release()
        return newf


if __name__ == "__main__":
    import os, time

    def f(params):
        delay, msg = params
        print "Calling sleep(%f) in %d with msg '%s'" % (delay,
                                                         os.getpid(), msg)
        time.sleep(delay)
        print "End of sleep(%f) in %d with msg '%s'" % (delay,
                                                        os.getpid(), msg)
        return "Slept %fs" % delay


    # We have to create the Pool AFTER the functions to call in it have been
    # defined. Using 3 worker processes
    pool = MultiThreadedPool(3)

    # Small test for apply() first
    print pool.apply(f, ((1.2, "Sleep 1200ms to test apply()"),))

    # Now test map()...
    class TestThread(threading.Thread):
        def __init__(self, params):
            threading.Thread.__init__(self)
            self.__params = params

        def run(self):
            print "Running on", self.__params
            try:
                r = pool.map(f, self.__params)
                print "Got result:", r
            except:
                print "Got exception", sys.exc_info()[0:2]

    jobs = ((1, "Sleep 1s"), (2, "Sleep 2s"), (3, "Sleep 3s"),
            (2.5, "BisSleep 2.5s"))
    # Jobs that will execute the same parallel tasks
    # Note: total duration = 3.5s because we have a pool of 3 processes
    t1 = TestThread(list(jobs))
    t2 = TestThread(list(jobs))
    t3 = TestThread(list(jobs))
    t4 = TestThread(list(jobs))
    t5 = TestThread(list(jobs))

    # jobs with a failure
    jobs = jobs + ((-42, "Invalid negative sleep time"),)
    tfail1 = TestThread(list(jobs))
    tfail2 = TestThread(list(jobs))

    # Starting 1st thread
    t1.start()

    time.sleep(1.5)
    # Starting a 2nd thread, which is asking for the same data t1 is
    # already processing
    t2.start()
    time.sleep(.5)
    t3.start()
    t4.start()
    # Should return at the same time as t1, with the same results

    # Wait for all of them to complete
    time.sleep(4)
    print "### We should start all over again now..."
    t5.start()
    # Starting 2 threads which should fail with an exception
    time.sleep(1)
    tfail1.start()
    tfail2.start()
    # 1 Thread should have finished, the 2 others
    # returned en exception almost at the same time

The demo shows that, for the 1st four threads, only one instance of the "sleep" batch is executed. This is because the threads are started while the batch is still running. So they just have to wait for the result, without needing to start a new bunch of sleep()s. Then the fifth thread and the 2 tfail threads show that this code works well when the functions to parallelize raise exceptions.

Notice that I had to add a work-around for a limitation in the processing package I am using (0.52). Refer to the code for the details (method unregister_jobs). I might send a patch to the "processing" authors.

3 comments

Jesse Noller 15 years, 7 months ago  # | flag

Hey david - right now I'm the cheerleader for the inclusion of the processing library into python-core for 2.6 and 3.0, your recipe looks good. You may want to submit it to the python bug tracker for inclusion in the included version of processing (renamed to multiprocessing). If you want to discuss, send me an email at jnoller at gmail dot com

david decotigny (author) 15 years, 7 months ago  # | flag

Thank you very much Jesse: I just created issue #3735 in the python bug tracker.

Some thoughts...

I was thinking of using weak references to automagically remove the "in flight" job requests from the "__inflight" cache when the last thread waiting for the job results doesn't need it anymore: this seemed nice because it would remove these lines 94-100 that do just this, and would make sure that the "in flight" requests are not removed too early, while other threads still have access to the result object.

But it seems NOT to be a good idea.

Because we need proper locking when we manipulate the self.__inflight dictionary: this means that the deletion callback of a weakref would have to acquire the lock of the MultiThreadedPool when it removes the entry from __inflight. If we use a WeakValueDictionary and add this locking to WeakValueDictionary::_remove() for example, I assumed that we had no control on the place when this _remove() callback would get called, and hence we had no control on where the locking/unlocking would take place (note: I don't know the interactions between weakref an the GC, and I don't know the properties of the GC; I took the pessimistic path in this assumption).

If _remove() gets called outside the critical section lines 44-59, it's just fine: it would lock/unlock just fine. But what happens if _remove() is called inside this section ? The first question is "is it possible ?", to which I assumed the answer is "yes, theoretically" (consequence of my pessimistic assumption above), even though I guess it's quite unlikely. But if it ever happens... what should we do ? If we don't do anything, there will be a deadlock. But if we use a recursive lock, we lose the critical section on lines 51 and 56: this would be a bug too.

So, as a consequence, I am leaving the code as it is (rev. 1). It means that there is a small risk that redundant requests might still be processed in parallel: when two threads A and B request the same job J, then J is processed, A is awoken and removes the job J from __inflight, but then the CPU switches to another thread C before B is awoken: if C submits another request for the same job J, then it will be processed in the background again because it was not in __inflight anymore, even though its result is still available because B still needs access to it for later, when it wakes up. However, IMHO, this case would probably be quite unlikely. So at least this implementation is slightly sub-optimal but not dangerous.

david decotigny (author) 15 years, 7 months ago  # | flag

Just updated code (=> rev. 3) to my latest version: support for multiprocessing (python 2.6b3), and added apply() method (+ code factorization through new _apply_async() method).