Welcome, guest | Sign In | My Account | Store | Cart
## @brief A simple variant of processing.Pool that accepts requests
#  from different threads.


# Import 'multiprocessing' package (formerly known as 'processing'):
try:
    # Tested with python 2.6 b3
    from multiprocessing import Pool
except ImportError:
    # Tested with standalone processing 0.52
    from processing import Pool

import threading, sys


class MultiThreadedPool:
    """
    A simple variant of processing.Pool that accepts requests
    from different threads: makes sure that requests being processed by
    the worker processes are not redundant.

    When a thread B submits a request which is already being processed
    in the background for another thread A, then B doesn't re-submit the
    request: it waits for the same result object as A.

    This package makes the following asumption:
    - the result of the function to call is entirely determined by its
      arguments (resp. "function", "params")

    As a consequence, in order to determine whether a "request" has
    already been submitted by another thread, we ONLY compare the
    couples (function, params). If a submitted request has the same
    couple (function, params) as a request in progress, then we wait
    for this request to be completed (valid result, or exception)

    This Pool should be safe wrt exceptions in the remote function.
    Only the map() and imap() methods are implemented.
    """
    __lock      = None # threading.Lock object
    __inflight  = None # dict: (function, params) -> processing.ApplyResult obj
    __workers   = None # processing.Pool object

    def __init__(self, processes=None, initializer=None, initargs=()):
        """See processing.Pool.__init__()"""
        self.__workers  = Pool(processes, initializer, initargs)
        self.__inflight = dict()
        self.__lock     = threading.Lock()

        # Apply locking decorator on close/terminate/join
        self._unregister_jobs = self.__make_synchronized(self._unregister_jobs)
        self.close     = self.__make_synchronized(self.__workers.close)
        self.terminate = self.__make_synchronized(self.__workers.terminate)
        self.join      = self.__make_synchronized(self.__workers.join)

    def apply(self, func, args=()):
        """Equivalent to processing.Pool::apply(), but without the kwds{}
        argument"""
        self.__lock.acquire()
        try:
            key, job, i_am_owner = self._apply_async(func, args)
        finally:
            self.__lock.release()

        # Wait for result
        try:
            return job.get()
        finally:
            self._unregister_jobs([(key, job, i_am_owner)])

    def imap(self, func, iterable):
        """Equivalent to processing.Pool.imap(), but without the
        "chunksize" argument"""
        jobs = [] # list of tuples (key, result_object, bool_i_started_the_job)
        # Build the list of jobs started in the background
        self.__lock.acquire()
        try:
            for param in iterable:
                jobs.append(self._apply_async(func, (param,)))
        finally:
            self.__lock.release()

        # Wait for everybody
        try:
            for key, job, i_am_owner in jobs:
                yield job.get()
        finally:
            self._unregister_jobs(jobs)

    def map(self, func, iterable):
        """Equivalent to processing.Pool.map(), but without the
        "chunksize" argument"""
        return list(self.imap(func, iterable))

    def _apply_async(self, func, args):
        """Return a tuple (inflight_key, applyResult object, i_am_owner)"""
        key = (func, args)
        try:
            # Job already started by somebody else
            job = self.__inflight[key]
            return key, job, False
        except KeyError:
            # We have to start a new job
            job = self.__workers.apply_async(func, args)
            self.__inflight[key] = job
            return key, job, True

    def _unregister_jobs(self, jobs):
        """
        Remove all the given "in flight" jobs.
        Due to a limitation of processing 0.52, we have to wake up
        additional threads waiting for the result by hand. The correct
        fix to processing would be to replace self._cond.notify() in
        ApplyResult::set() by self._cond.notifyAll()
        """
        for key, job, i_am_owner in jobs:
            # Begin workaround
            # processing.ApplyResult._set doesn't call notifyAll !
            # we have to do it ourselves.
            # Don't move it: nothing guarantees
            # that the owner will be the 1st to wake up !
            job._cond.acquire()
            job._cond.notifyAll()
            job._cond.release()
            # End workaround

            if not i_am_owner:
                # Don't remove it from the in_flight list
                continue
            try:
                del self.__inflight[key]
            except KeyError:
                print >>sys.stderr, "Warning: job not in queue", key, job

    def __make_synchronized(self, f):
        """Local decorator to make a method calling lock acquire/release"""
        def newf(*args, **kw):
            self.__lock.acquire()
            try:
                return f(*args, **kw)
            finally:
                self.__lock.release()
        return newf


if __name__ == "__main__":
    import os, time

    def f(params):
        delay, msg = params
        print "Calling sleep(%f) in %d with msg '%s'" % (delay,
                                                         os.getpid(), msg)
        time.sleep(delay)
        print "End of sleep(%f) in %d with msg '%s'" % (delay,
                                                        os.getpid(), msg)
        return "Slept %fs" % delay


    # We have to create the Pool AFTER the functions to call in it have been
    # defined. Using 3 worker processes
    pool = MultiThreadedPool(3)

    # Small test for apply() first
    print pool.apply(f, ((1.2, "Sleep 1200ms to test apply()"),))

    # Now test map()...
    class TestThread(threading.Thread):
        def __init__(self, params):
            threading.Thread.__init__(self)
            self.__params = params

        def run(self):
            print "Running on", self.__params
            try:
                r = pool.map(f, self.__params)
                print "Got result:", r
            except:
                print "Got exception", sys.exc_info()[0:2]

    jobs = ((1, "Sleep 1s"), (2, "Sleep 2s"), (3, "Sleep 3s"),
            (2.5, "BisSleep 2.5s"))
    # Jobs that will execute the same parallel tasks
    # Note: total duration = 3.5s because we have a pool of 3 processes
    t1 = TestThread(list(jobs))
    t2 = TestThread(list(jobs))
    t3 = TestThread(list(jobs))
    t4 = TestThread(list(jobs))
    t5 = TestThread(list(jobs))

    # jobs with a failure
    jobs = jobs + ((-42, "Invalid negative sleep time"),)
    tfail1 = TestThread(list(jobs))
    tfail2 = TestThread(list(jobs))

    # Starting 1st thread
    t1.start()

    time.sleep(1.5)
    # Starting a 2nd thread, which is asking for the same data t1 is
    # already processing
    t2.start()
    time.sleep(.5)
    t3.start()
    t4.start()
    # Should return at the same time as t1, with the same results

    # Wait for all of them to complete
    time.sleep(4)
    print "### We should start all over again now..."
    t5.start()
    # Starting 2 threads which should fail with an exception
    time.sleep(1)
    tfail1.start()
    tfail2.start()
    # 1 Thread should have finished, the 2 others
    # returned en exception almost at the same time

History

  • revision 3 (15 years ago)
  • previous revisions are not available