## @brief A simple variant of processing.Pool that accepts requests
# from different threads.
# Import 'multiprocessing' package (formerly known as 'processing'):
try:
# Tested with python 2.6 b3
from multiprocessing import Pool
except ImportError:
# Tested with standalone processing 0.52
from processing import Pool
import threading, sys
class MultiThreadedPool:
"""
A simple variant of processing.Pool that accepts requests
from different threads: makes sure that requests being processed by
the worker processes are not redundant.
When a thread B submits a request which is already being processed
in the background for another thread A, then B doesn't re-submit the
request: it waits for the same result object as A.
This package makes the following asumption:
- the result of the function to call is entirely determined by its
arguments (resp. "function", "params")
As a consequence, in order to determine whether a "request" has
already been submitted by another thread, we ONLY compare the
couples (function, params). If a submitted request has the same
couple (function, params) as a request in progress, then we wait
for this request to be completed (valid result, or exception)
This Pool should be safe wrt exceptions in the remote function.
Only the map() and imap() methods are implemented.
"""
__lock = None # threading.Lock object
__inflight = None # dict: (function, params) -> processing.ApplyResult obj
__workers = None # processing.Pool object
def __init__(self, processes=None, initializer=None, initargs=()):
"""See processing.Pool.__init__()"""
self.__workers = Pool(processes, initializer, initargs)
self.__inflight = dict()
self.__lock = threading.Lock()
# Apply locking decorator on close/terminate/join
self._unregister_jobs = self.__make_synchronized(self._unregister_jobs)
self.close = self.__make_synchronized(self.__workers.close)
self.terminate = self.__make_synchronized(self.__workers.terminate)
self.join = self.__make_synchronized(self.__workers.join)
def apply(self, func, args=()):
"""Equivalent to processing.Pool::apply(), but without the kwds{}
argument"""
self.__lock.acquire()
try:
key, job, i_am_owner = self._apply_async(func, args)
finally:
self.__lock.release()
# Wait for result
try:
return job.get()
finally:
self._unregister_jobs([(key, job, i_am_owner)])
def imap(self, func, iterable):
"""Equivalent to processing.Pool.imap(), but without the
"chunksize" argument"""
jobs = [] # list of tuples (key, result_object, bool_i_started_the_job)
# Build the list of jobs started in the background
self.__lock.acquire()
try:
for param in iterable:
jobs.append(self._apply_async(func, (param,)))
finally:
self.__lock.release()
# Wait for everybody
try:
for key, job, i_am_owner in jobs:
yield job.get()
finally:
self._unregister_jobs(jobs)
def map(self, func, iterable):
"""Equivalent to processing.Pool.map(), but without the
"chunksize" argument"""
return list(self.imap(func, iterable))
def _apply_async(self, func, args):
"""Return a tuple (inflight_key, applyResult object, i_am_owner)"""
key = (func, args)
try:
# Job already started by somebody else
job = self.__inflight[key]
return key, job, False
except KeyError:
# We have to start a new job
job = self.__workers.apply_async(func, args)
self.__inflight[key] = job
return key, job, True
def _unregister_jobs(self, jobs):
"""
Remove all the given "in flight" jobs.
Due to a limitation of processing 0.52, we have to wake up
additional threads waiting for the result by hand. The correct
fix to processing would be to replace self._cond.notify() in
ApplyResult::set() by self._cond.notifyAll()
"""
for key, job, i_am_owner in jobs:
# Begin workaround
# processing.ApplyResult._set doesn't call notifyAll !
# we have to do it ourselves.
# Don't move it: nothing guarantees
# that the owner will be the 1st to wake up !
job._cond.acquire()
job._cond.notifyAll()
job._cond.release()
# End workaround
if not i_am_owner:
# Don't remove it from the in_flight list
continue
try:
del self.__inflight[key]
except KeyError:
print >>sys.stderr, "Warning: job not in queue", key, job
def __make_synchronized(self, f):
"""Local decorator to make a method calling lock acquire/release"""
def newf(*args, **kw):
self.__lock.acquire()
try:
return f(*args, **kw)
finally:
self.__lock.release()
return newf
if __name__ == "__main__":
import os, time
def f(params):
delay, msg = params
print "Calling sleep(%f) in %d with msg '%s'" % (delay,
os.getpid(), msg)
time.sleep(delay)
print "End of sleep(%f) in %d with msg '%s'" % (delay,
os.getpid(), msg)
return "Slept %fs" % delay
# We have to create the Pool AFTER the functions to call in it have been
# defined. Using 3 worker processes
pool = MultiThreadedPool(3)
# Small test for apply() first
print pool.apply(f, ((1.2, "Sleep 1200ms to test apply()"),))
# Now test map()...
class TestThread(threading.Thread):
def __init__(self, params):
threading.Thread.__init__(self)
self.__params = params
def run(self):
print "Running on", self.__params
try:
r = pool.map(f, self.__params)
print "Got result:", r
except:
print "Got exception", sys.exc_info()[0:2]
jobs = ((1, "Sleep 1s"), (2, "Sleep 2s"), (3, "Sleep 3s"),
(2.5, "BisSleep 2.5s"))
# Jobs that will execute the same parallel tasks
# Note: total duration = 3.5s because we have a pool of 3 processes
t1 = TestThread(list(jobs))
t2 = TestThread(list(jobs))
t3 = TestThread(list(jobs))
t4 = TestThread(list(jobs))
t5 = TestThread(list(jobs))
# jobs with a failure
jobs = jobs + ((-42, "Invalid negative sleep time"),)
tfail1 = TestThread(list(jobs))
tfail2 = TestThread(list(jobs))
# Starting 1st thread
t1.start()
time.sleep(1.5)
# Starting a 2nd thread, which is asking for the same data t1 is
# already processing
t2.start()
time.sleep(.5)
t3.start()
t4.start()
# Should return at the same time as t1, with the same results
# Wait for all of them to complete
time.sleep(4)
print "### We should start all over again now..."
t5.start()
# Starting 2 threads which should fail with an exception
time.sleep(1)
tfail1.start()
tfail2.start()
# 1 Thread should have finished, the 2 others
# returned en exception almost at the same time