Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""worker.py: A framework for executing inline code, contained in a generator,
across multiple execution contexts; for example, a generator which executes
some iterations synchronously and some asynchronously.

The generator to be executed yields for each change of context. A decorator
added to the generator associates an executor with it; the executor iterates
the generator and handles the context switching at each yield. The executor
and generator can communicate values through the yield statements in the
generator and the _send() method in the executor.

References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""

import os
import sys

import threading
import traceback

from Queue import Queue, Empty

try:
    from functools import wraps
except ImportError:
    def wraps(wrapped):
        """A naive implementation of the wraps() decorator for Python <2.5"""
        def wrap(wrapper):
            wrapper.__name__ = wrapped.__name__
            wrapper.__doc__ = wrapped.__doc__
            wrapper.__module__ = wrapped.__module__
            wrapper.__dict__.update(wrapped.__dict__)
            return wrapper
        return wrap

if __name__ == '__main__':
    import optparse
    import time

__version__ = '$Revision: 1538 $'.split()[1]

__usage__ = 'usage: %prog [options]'


def execute(exec_factory, *exargs, **exkeys):
    """Decorator which associates a generator with a corresponding executor.
    Calling the decorated generator starts the executor (which must be a
    callable) and then returns it.

    The *exec_factory* argument is a callable (such as a class or function)
    which takes a generator as its first argument and returns a callable
    executor. *exargs* and *exkeys* are passed to the factory callable.

    A generator function decorated with this decorator gains the additional
    (undeclared) keyword parameter *exc_handler*, which is intercepted and
    passed to the executor factory rather than the generator function.
    """
    def exec_wrapper(generator):
        @wraps(generator)
        def work_factory(*genargs, **genkeys):
            if "exc_handler" in genkeys:
                exkeys["exc_handler"] = genkeys.pop("exc_handler")
            work_iter = generator(*genargs, **genkeys)
            executor = exec_factory(work_iter, *exargs, **exkeys)()
            return executor
        return work_factory
    return exec_wrapper


class Executor:
    """A skeletal base class for executors which delegates to the enclosed
    generator. Not required, but useful.

    Does not implement any context switching or asynchronous behavior; it is
    expected that subclasses will reimplement the __call__() method as
    appropriate.
    """
    def __init__(self, generator, exc_handler=traceback.print_exc):
        """*exc_handler* is a callable which will be called by _handle_exit()
        if the generator raises an exception other than StopIteration. If it
        is None, exceptions (including StopIteration) are reraised.
        """
        self.__generator = generator
        self.__exc_handler = exc_handler

    def __iter__(self): return self

    def __getattr__(self, attr):
        """Cheap delegation to the generator"""
        return getattr(self.__generator, attr)

    def __call__(self):
        """Starts the executor, which then iterates the generator to
        completion, changing contexts as appropriate at each yield.

        The default implementation contains no asynchronous execution or
        context changes.  Execution stops when the generator stops iterating.
        """
        while not self.stopped(): self.next()
        return self

    def _handle_exit(self, isStop=False):
        """Discards the generator on exit, so that generator exit can be
        checked by calling stopped() rather than wrapping everything in
        try...except. Calls a handler (passed to __init__() in *exc_handler*)
        if the generator raised an exception other than StopIteration;
        reraises exceptions (including StopIteration) if the handler is None.

        *isStop* is True if the generator raised StopIteration.
        """
        self.__generator = None
        if not self.__exc_handler: raise
        elif not isStop: self.__exc_handler()
        return None

    def stopped(self):
        """Checks whether our generator has exited and been discarded."""
        return self.__generator is None

    def __call_gen(self, method, *args, **keys):
        """Wrapper around the generator's methods which calls _handle_exit()
        when the generator exits or raises an exception."""
        if not self.stopped():
            try:
                m = getattr(self.__generator, method)
                return m(*args, **keys)
            except StopIteration:
                return self._handle_exit(True)
            except:
                return self._handle_exit(False)
        elif self.__exc_handler is None: raise StopIteration()
        else: return None

    def throw(self, *args, **keys):
        """Wrapper around the generator's throw() method which calls
        _handle_exit() when the generator exits or raises an exception."""
        return self.__call_gen("throw", *args, **keys)

    def close(self):
        """Wrapper around the generator's close() method which calls
        _handle_exit() when the generator exits or raises an exception."""
        return self.__call_gen("close")

    def next(self):
        """Wrapper around the generator's next() method which calls
        _handle_exit() when the generator exits or raises an exception."""
        return self.__call_gen("next")

    def send(self, value):
        """Wrapper around the generator's send() method which calls
        _handle_exit() when the generator exits or raises an exception."""
        return self.__call_gen("send", value)


class ThreadExecutor(Executor):
    """Executes alternate iterations of a generator asynchonously (in a
    separate thread) and synchronously (through a callable, which usually
    queues into an event queue).
    """
    def __init__(self, generator, synchronizer, *args, **kwargs):
        """*synchronizer* is a callable which will cause a callable passed to
        it to execute in the synchronous context.  This is usually done by
        queueing the passed callable in some form of event dispatch queue.
        """
        self.__synchronizer = synchronizer
        Executor.__init__(self, generator, *args, **kwargs)

    def __call__(self):
        """If the generator has not stopped, iterate the generator in a
        separate thread and then call __finish().
        """
        if not self.stopped():
            try:
                threading.Thread(target=lambda:(self.next(), self.__finish())).start()
            except:
                self.throw(*sys.exc_info())
        return self

    def __finish(self):
        """If the generator has not stopped, iterate the generator through the
        synchronizer, and call self() to continue iterating.
        """
        if not self.stopped():
            try:
                self.__synchronizer(lambda:(self.next(), self()))
            except:
                self.throw(*sys.exc_info())


class QueuedGeneratorWrapper():
    """A wrapper around a generator which overrides the generator methods to
    call task_done() on a Queue when the generator terminates."""
    def __init__(self, generator, queue):
        self.__generator = generator
        self.__queue = queue

    def __call_gen(self, method, *args, **keys):
        if self.__generator is not None:
            try:
                m = getattr(self.__generator, method)
                return m(*args, **keys)
            except:
                self.__generator = None
                self.__queue.task_done()
                raise
        else: raise StopIteration()

    def throw(self, *args, **keys):
        return self.__call_gen("throw", *args, **keys)

    def close(self):
        return self.__call_gen("close")

    def next(self):
        return self.__call_gen("next")

    def send(self, value):
        return self.__call_gen("send", value)


class ExecutionQueue(Queue):
    """An executor factory that maintains a queue of Executors which are
    automatically dequeued and executed in sequence.

    An instance of this class can be passed to the @execute decorator,
    followed by the Executor class (and its arguments) that will be used to
    execute the decorated generator.
    """
    def __init__(self, *qargs, **qkeys):
        """*qargs* and *qkeys* are passed to the underlying Queue object."""
        Queue.__init__(self, *qargs, **qkeys)
        self.__current_exec = None
        self.__exec_mutex = threading.Lock()

    def __call__(self, generator, exec_class, *exargs, **exkeys):
        """Create an executor, wrapping the generator in an object that starts
        the next queued executor when this one exits."""
        executor = exec_class(QueuedGeneratorWrapper(generator, self), *exargs, **exkeys)
        return lambda: self.put_nowait(executor)

    def task_done(self):
        """Start up the next task in the queue as each one completes."""
        Queue.task_done(self)
        self.__next(isRunning=True)

    def __next(self, isRunning=False):
        """Dequeue and start the next executor.

        *isRunning* is checked against the current execution status before
        dequeuing and starting the next executor."""
        self.__exec_mutex.acquire()
        if isRunning != (self.__current_exec is None):
            try:
                self.__current_exec = self.get_nowait()
            except Empty:
                self.__current_exec = None
                return
            finally:
                self.__exec_mutex.release()
            self.__current_exec()
        else:
            self.__exec_mutex.release()

    def put(self, item, *args, **kwargs):
        """Queue an executor, and start execution if it is not running."""
        Queue.put(self, item, *args, **kwargs)
        self.__next()


if __name__ == '__main__':
    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--workers', type='int', metavar='N', default=4,
            help='Number of workers to create (default %default)')
    optparser.add_option('--loops', type='int', metavar='N', default=2,
            help='Number of times to iterate each worker (default %default)')
    optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
            help='Timeout for event loop (default %default sec.)')
    optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
            help='Worker delay to simulate work (default %default sec.)')
    (options, args) = optparser.parse_args()

    printLock = threading.Lock()
    eventq = Queue()
    execq = ExecutionQueue()

    def printThread(name, action):
        printLock.acquire()
        print "%s loop %s in %s of %d threads" % (name, action,
            threading.currentThread().getName(), threading.activeCount())
        printLock.release()

    def loop(looptime=0.5):
        """A simple event queue loop."""
        while threading.activeCount() > 1 or not eventq.empty():
            try:
                next = eventq.get(timeout=looptime)
                printThread(" Event", "executing event")
                if callable(next): next()
            except Empty:
                printThread(" Event", "running")

    @execute(execq, ThreadExecutor, eventq.put)
    def work(wnum, loops=2, worktime=2.0):
        """An example generator which executes a simple loop."""
        for count in range(loops):
            # Work performed in separate thread
            printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
            time.sleep(worktime)
            printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
            yield True
            # Work performed in event queue
            printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
            yield True

    # Create and queue the workers, and then loop the event queue
    for x in range(options.workers):
        work(x, loops=options.loops, worktime=options.worktime, exc_handler=traceback.print_exc)
    loop(looptime=options.looptime)

Diff to Previous Revision

--- revision 10 2011-02-11 18:59:11
+++ revision 11 2011-02-14 20:53:11
@@ -39,7 +39,7 @@
     import optparse
     import time
 
-__version__ = '$Revision: 1521 $'.split()[1]
+__version__ = '$Revision: 1538 $'.split()[1]
 
 __usage__ = 'usage: %prog [options]'
 
@@ -99,8 +99,9 @@
         context changes.  Execution stops when the generator stops iterating.
         """
         while not self.stopped(): self.next()
-
-    def __handle_exit(self, isStop=False):
+        return self
+
+    def _handle_exit(self, isStop=False):
         """Discards the generator on exit, so that generator exit can be
         checked by calling stopped() rather than wrapping everything in
         try...except. Calls a handler (passed to __init__() in *exc_handler*)
@@ -123,33 +124,34 @@
         when the generator exits or raises an exception."""
         if not self.stopped():
             try:
-                return method(*args, **keys)
+                m = getattr(self.__generator, method)
+                return m(*args, **keys)
             except StopIteration:
-                return self.__handle_exit(True)
+                return self._handle_exit(True)
             except:
-                return self.__handle_exit(False)
+                return self._handle_exit(False)
         elif self.__exc_handler is None: raise StopIteration()
         else: return None
 
     def throw(self, *args, **keys):
         """Wrapper around the generator's throw() method which calls
         _handle_exit() when the generator exits or raises an exception."""
-        return self.__call_gen(self.__generator.throw, *args, **keys)
+        return self.__call_gen("throw", *args, **keys)
 
     def close(self):
         """Wrapper around the generator's close() method which calls
         _handle_exit() when the generator exits or raises an exception."""
-        return self.__call_gen(self.__generator.close)
+        return self.__call_gen("close")
 
     def next(self):
         """Wrapper around the generator's next() method which calls
         _handle_exit() when the generator exits or raises an exception."""
-        return self.__call_gen(self.__generator.next)
+        return self.__call_gen("next")
 
     def send(self, value):
         """Wrapper around the generator's send() method which calls
         _handle_exit() when the generator exits or raises an exception."""
-        return self.__call_gen(self.__generator.send, value)
+        return self.__call_gen("send", value)
 
 
 class ThreadExecutor(Executor):
@@ -174,6 +176,7 @@
                 threading.Thread(target=lambda:(self.next(), self.__finish())).start()
             except:
                 self.throw(*sys.exc_info())
+        return self
 
     def __finish(self):
         """If the generator has not stopped, iterate the generator through the
@@ -196,7 +199,8 @@
     def __call_gen(self, method, *args, **keys):
         if self.__generator is not None:
             try:
-                return method(*args, **keys)
+                m = getattr(self.__generator, method)
+                return m(*args, **keys)
             except:
                 self.__generator = None
                 self.__queue.task_done()
@@ -204,16 +208,16 @@
         else: raise StopIteration()
 
     def throw(self, *args, **keys):
-        return self.__call_gen(self.__generator.throw, *args, **keys)
+        return self.__call_gen("throw", *args, **keys)
 
     def close(self):
-        return self.__call_gen(self.__generator.close)
+        return self.__call_gen("close")
 
     def next(self):
-        return self.__call_gen(self.__generator.next)
+        return self.__call_gen("next")
 
     def send(self, value):
-        return self.__call_gen(self.__generator.send, value)
+        return self.__call_gen("send", value)
 
 
 class ExecutionQueue(Queue):

History