processing.Pool (http://pypi.python.org/pypi/processing) is a nice tool to "parallelize" map() on multiple CPUs. However, imagine you have X threads which send the same request Pool.map(getNthPrimeNumber, [100000, 10000000, 10000]) at (almost) the same time. Obviously, you don't want to compute X times getNthPrimeNumber for 100000, 10000000, 10000... unless you have 3.X processors available. You would like one thread to submit the 3 requests, and then the X-1 others would notice that the requests have already been submitted and will then just wait for the result. This is what this code is about: a kind of "trensient memoize" for processing.Pool::imap().
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | ## @brief A simple variant of processing.Pool that accepts requests
# from different threads.
# Import 'multiprocessing' package (formerly known as 'processing'):
try:
# Tested with python 2.6 b3
from multiprocessing import Pool
except ImportError:
# Tested with standalone processing 0.52
from processing import Pool
import threading, sys
class MultiThreadedPool:
"""
A simple variant of processing.Pool that accepts requests
from different threads: makes sure that requests being processed by
the worker processes are not redundant.
When a thread B submits a request which is already being processed
in the background for another thread A, then B doesn't re-submit the
request: it waits for the same result object as A.
This package makes the following asumption:
- the result of the function to call is entirely determined by its
arguments (resp. "function", "params")
As a consequence, in order to determine whether a "request" has
already been submitted by another thread, we ONLY compare the
couples (function, params). If a submitted request has the same
couple (function, params) as a request in progress, then we wait
for this request to be completed (valid result, or exception)
This Pool should be safe wrt exceptions in the remote function.
Only the map() and imap() methods are implemented.
"""
__lock = None # threading.Lock object
__inflight = None # dict: (function, params) -> processing.ApplyResult obj
__workers = None # processing.Pool object
def __init__(self, processes=None, initializer=None, initargs=()):
"""See processing.Pool.__init__()"""
self.__workers = Pool(processes, initializer, initargs)
self.__inflight = dict()
self.__lock = threading.Lock()
# Apply locking decorator on close/terminate/join
self._unregister_jobs = self.__make_synchronized(self._unregister_jobs)
self.close = self.__make_synchronized(self.__workers.close)
self.terminate = self.__make_synchronized(self.__workers.terminate)
self.join = self.__make_synchronized(self.__workers.join)
def apply(self, func, args=()):
"""Equivalent to processing.Pool::apply(), but without the kwds{}
argument"""
self.__lock.acquire()
try:
key, job, i_am_owner = self._apply_async(func, args)
finally:
self.__lock.release()
# Wait for result
try:
return job.get()
finally:
self._unregister_jobs([(key, job, i_am_owner)])
def imap(self, func, iterable):
"""Equivalent to processing.Pool.imap(), but without the
"chunksize" argument"""
jobs = [] # list of tuples (key, result_object, bool_i_started_the_job)
# Build the list of jobs started in the background
self.__lock.acquire()
try:
for param in iterable:
jobs.append(self._apply_async(func, (param,)))
finally:
self.__lock.release()
# Wait for everybody
try:
for key, job, i_am_owner in jobs:
yield job.get()
finally:
self._unregister_jobs(jobs)
def map(self, func, iterable):
"""Equivalent to processing.Pool.map(), but without the
"chunksize" argument"""
return list(self.imap(func, iterable))
def _apply_async(self, func, args):
"""Return a tuple (inflight_key, applyResult object, i_am_owner)"""
key = (func, args)
try:
# Job already started by somebody else
job = self.__inflight[key]
return key, job, False
except KeyError:
# We have to start a new job
job = self.__workers.apply_async(func, args)
self.__inflight[key] = job
return key, job, True
def _unregister_jobs(self, jobs):
"""
Remove all the given "in flight" jobs.
Due to a limitation of processing 0.52, we have to wake up
additional threads waiting for the result by hand. The correct
fix to processing would be to replace self._cond.notify() in
ApplyResult::set() by self._cond.notifyAll()
"""
for key, job, i_am_owner in jobs:
# Begin workaround
# processing.ApplyResult._set doesn't call notifyAll !
# we have to do it ourselves.
# Don't move it: nothing guarantees
# that the owner will be the 1st to wake up !
job._cond.acquire()
job._cond.notifyAll()
job._cond.release()
# End workaround
if not i_am_owner:
# Don't remove it from the in_flight list
continue
try:
del self.__inflight[key]
except KeyError:
print >>sys.stderr, "Warning: job not in queue", key, job
def __make_synchronized(self, f):
"""Local decorator to make a method calling lock acquire/release"""
def newf(*args, **kw):
self.__lock.acquire()
try:
return f(*args, **kw)
finally:
self.__lock.release()
return newf
if __name__ == "__main__":
import os, time
def f(params):
delay, msg = params
print "Calling sleep(%f) in %d with msg '%s'" % (delay,
os.getpid(), msg)
time.sleep(delay)
print "End of sleep(%f) in %d with msg '%s'" % (delay,
os.getpid(), msg)
return "Slept %fs" % delay
# We have to create the Pool AFTER the functions to call in it have been
# defined. Using 3 worker processes
pool = MultiThreadedPool(3)
# Small test for apply() first
print pool.apply(f, ((1.2, "Sleep 1200ms to test apply()"),))
# Now test map()...
class TestThread(threading.Thread):
def __init__(self, params):
threading.Thread.__init__(self)
self.__params = params
def run(self):
print "Running on", self.__params
try:
r = pool.map(f, self.__params)
print "Got result:", r
except:
print "Got exception", sys.exc_info()[0:2]
jobs = ((1, "Sleep 1s"), (2, "Sleep 2s"), (3, "Sleep 3s"),
(2.5, "BisSleep 2.5s"))
# Jobs that will execute the same parallel tasks
# Note: total duration = 3.5s because we have a pool of 3 processes
t1 = TestThread(list(jobs))
t2 = TestThread(list(jobs))
t3 = TestThread(list(jobs))
t4 = TestThread(list(jobs))
t5 = TestThread(list(jobs))
# jobs with a failure
jobs = jobs + ((-42, "Invalid negative sleep time"),)
tfail1 = TestThread(list(jobs))
tfail2 = TestThread(list(jobs))
# Starting 1st thread
t1.start()
time.sleep(1.5)
# Starting a 2nd thread, which is asking for the same data t1 is
# already processing
t2.start()
time.sleep(.5)
t3.start()
t4.start()
# Should return at the same time as t1, with the same results
# Wait for all of them to complete
time.sleep(4)
print "### We should start all over again now..."
t5.start()
# Starting 2 threads which should fail with an exception
time.sleep(1)
tfail1.start()
tfail2.start()
# 1 Thread should have finished, the 2 others
# returned en exception almost at the same time
|
The demo shows that, for the 1st four threads, only one instance of the "sleep" batch is executed. This is because the threads are started while the batch is still running. So they just have to wait for the result, without needing to start a new bunch of sleep()s. Then the fifth thread and the 2 tfail threads show that this code works well when the functions to parallelize raise exceptions.
Notice that I had to add a work-around for a limitation in the processing package I am using (0.52). Refer to the code for the details (method unregister_jobs). I might send a patch to the "processing" authors.
Hey david - right now I'm the cheerleader for the inclusion of the processing library into python-core for 2.6 and 3.0, your recipe looks good. You may want to submit it to the python bug tracker for inclusion in the included version of processing (renamed to multiprocessing). If you want to discuss, send me an email at jnoller at gmail dot com
Thank you very much Jesse: I just created issue #3735 in the python bug tracker.
Some thoughts...
I was thinking of using weak references to automagically remove the "in flight" job requests from the "__inflight" cache when the last thread waiting for the job results doesn't need it anymore: this seemed nice because it would remove these lines 94-100 that do just this, and would make sure that the "in flight" requests are not removed too early, while other threads still have access to the result object.
But it seems NOT to be a good idea.
Because we need proper locking when we manipulate the self.__inflight dictionary: this means that the deletion callback of a weakref would have to acquire the lock of the MultiThreadedPool when it removes the entry from __inflight. If we use a WeakValueDictionary and add this locking to WeakValueDictionary::_remove() for example, I assumed that we had no control on the place when this _remove() callback would get called, and hence we had no control on where the locking/unlocking would take place (note: I don't know the interactions between weakref an the GC, and I don't know the properties of the GC; I took the pessimistic path in this assumption).
If _remove() gets called outside the critical section lines 44-59, it's just fine: it would lock/unlock just fine. But what happens if _remove() is called inside this section ? The first question is "is it possible ?", to which I assumed the answer is "yes, theoretically" (consequence of my pessimistic assumption above), even though I guess it's quite unlikely. But if it ever happens... what should we do ? If we don't do anything, there will be a deadlock. But if we use a recursive lock, we lose the critical section on lines 51 and 56: this would be a bug too.
So, as a consequence, I am leaving the code as it is (rev. 1). It means that there is a small risk that redundant requests might still be processed in parallel: when two threads A and B request the same job J, then J is processed, A is awoken and removes the job J from __inflight, but then the CPU switches to another thread C before B is awoken: if C submits another request for the same job J, then it will be processed in the background again because it was not in __inflight anymore, even though its result is still available because B still needs access to it for later, when it wakes up. However, IMHO, this case would probably be quite unlikely. So at least this implementation is slightly sub-optimal but not dangerous.
Just updated code (=> rev. 3) to my latest version: support for multiprocessing (python 2.6b3), and added apply() method (+ code factorization through new _apply_async() method).