Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""worker.py: Executes inline code across multiple execution contexts.

The inline code to be executed is contained in a generator, which contains a
`yield` statement to signal each context change. A decorator added to the
generator function wraps the returned generator in the executor when the
generator function is called. Calling the returned executor iterates the
generator to completion, handling the context switching at each `yield`. The
executor and generator can communicate values through the `yield` statements
and `send()` method of the generator.

An example is provided of an executor which executes alternate iterations of a
generator asynchronously (in a `Thread`) and synchronously (in an event loop).

References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""

import threading
from sys import exc_info
from traceback import print_exc
from functools import partial, wraps
from Queue import Queue, Empty

if __name__ == '__main__':
    import optparse
    from time import sleep

__version__ = '$Revision: 2539 $'.split()[1]

__usage__ = 'usage: %prog [options]'


def execute(exec_factory, *exargs, **exkeys):
    """Wrap a returned generator in an executor.

    The returned executor can then be called to iterate the generator to
    completion. The executor should also implement the signature of the
    returned generator.

    The `exec_factory` argument is a callable (such as a class or function)
    which takes a generator as its first argument and returns a callable
    executor.

    Generator functions decorated with `execute` can be passed arguments in
    three different places:
    * in the decorator (`exargs` and `exkeys`); these arguments are passed to
        `exec_factory` when the executor is instantiated;
    * in the call to the wrapped generator function; these arguments are
        passed unchanged when instantiating the generator; and
    * in the call to the executor returned by the wrapped generator function.
    """
    def exec_wrapper(generator):
        @wraps(generator)
        def work_factory(*genargs, **genkeys):
            work_iter = generator(*genargs, **genkeys)
            return exec_factory(work_iter, *exargs, **exkeys)
        return work_factory
    return exec_wrapper


class Executor:
    """A skeletal base class for executors.

    Delegates to the enclosed generator so as not to change the signature.
    Subclasses must implement the `_execute()` method.
    """
    def __init__(self, generator, exc_handler=print_exc):
        """Construct an executor.

        If `exc_handler` is not `None` and any method of the generator raises
        an exception other than `StopIteration`, `exc_handler` will be called;
        otherwise, exceptions (including `StopIteration`) are reraised to the
        caller. Thus, an implementation of Executor can call the exception
        handler by calling `self.throw(*sys.exc_info())`; the exception will
        be handled by the generator, handled by the exception handler (if
        any), or thrown back to the caller.
        """
        self.__generator = generator
        self.__executing = False
        self.__exc_handler = exc_handler
        self.__throw = generator.throw
        self.__send = generator.send
        self.__next = generator.next
        self.__close = generator.close

    def __iter__(self): return self

    def __call_gen(self, method, *args, **keys):
        """If the generator exits, discard it and call `_handle_exit()`."""
        if not self.stopped():
            try:
                return method(*args, **keys)
            except StopIteration:
                return self._handle_exit(True)
            except:
                return self._handle_exit(False)
        elif self.__exc_handler is None: raise StopIteration()
        else: return None

    def __call__(self, *args, **keys):
        """Start the executor. May only be called once per executor.

        Returns the excutor, as a convenience for chained calls.

        Subclasses should not override this method, but should implement the
        `_execute()` method to iterate the generator to completion, changing
        contexts as appropriate at each `yield` statement.

        If the `exc_handler` key is present, it will be removed and will
        override the default exception handler.
        """
        if not self.__executing:
            self.__executing = True
            exc_handler = keys.pop('exc_handler', None)
            if exc_handler is not None: self.__exc_handler = exc_handler
            try:
                self._execute(*args, **keys)
            except:
                self.throw(*exc_info())
        else:
            raise ValueError("executor already executing." if self.__generator else "executor already complete.")
        return self

    def _execute(self):
        """Start the executor.

        Subclasses must implement the `_execute()` method to iterate the
        generator to completion, changing contexts as appropriate at each
        `yield` statement.
        """
        raise NotImplementedError, "_execute() must be implemented in subclass"

    def _handle_exit(self, isStop=False):
        """Handle a generator exit.

        Discards the generator, so that generator exit can be checked by
        calling `stopped()`, rather than wrapping every call to the generator
        in a `try...except` clause. Calls `exc_handler` if the generator
        raises an exception other than `StopIteration`; reraises exceptions
        (including `StopIteration`) if `exc_handler` is None.

        `isStop` is `True` if the generator raised `StopIteration`.
        """
        self.__generator = None
        if not self.__exc_handler: raise
        elif not isStop: self.__exc_handler()
        return None

    def stopped(self):
        """Check whether the generator has exited."""
        return self.__generator is None

    def throw(self, *args, **keys):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__throw, *args, **keys)

    def close(self):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__close)

    def next(self):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__next)

    def send(self, value):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__send, value)


class ThreadExecutor(Executor):
    """Executes alternate iterations asynchonously and synchronously.

    Asynchronous iterations are executed in a separate thread; synchronous
    iterations are executed through a callable, which usually queues into an
    event queue.
    """
    def __init__(self, generator, synchronizer, exc_handler=print_exc):
        """Construct a threaded executor.

        `synchronizer` is a callable which executes a callable passed to it in
        the synchronous context, usually by queueing the passed callable in an
        event dispatch queue.
        """
        self.__synchronizer = synchronizer
        Executor.__init__(self, generator, exc_handler)

    def _execute(self):
        """Check for exit, iterate once in a separate thread, and call `__finish()`."""
        if not self.stopped():
            try:
                threading.Thread(target=lambda:(self.next(), self.__finish())).start()
            except:
                self.throw(*exc_info())

    def __finish(self):
        """Check for exit, iterate once in the synchronous context, and call `self()`."""
        if not self.stopped():
            try:
                self.__synchronizer(lambda:(self.next(), self._execute()))
            except:
                self.throw(*exc_info())


class GeneratorWrapper(Executor):
    """An executor which turns a generator into a callable."""
    def __init__(self, generator, iterate_once=False, exc_handler=None):
        """If `iterate_once` is `True`, the generator is iterated once (by
        calling `next()`) immediately after construction, in order to be able
        to pass the parameters of the first `__call__()` to the generator by
        calling `send()`. Any yielded value is discarded. Note that if the
        first `__call__()` passes no arguments or a single `None`, the first
        iteration will succeed, and the generator need not be iterated once to
        initialize it.

        Note also that the default exception handler for a wrapper is `None`,
        so that exceptions are raised to the caller of `__call__()`.
        """
        Executor.__init__(self, generator, exc_handler)
        if iterate_once: generator.next()

    def __call__(self, *args, **keys):
        """Iterate the generator.

        Packages the parameters in the most reasonable fashion, calls
        `next()` or `send()`, and returns the yielded value.

        Overrides `__call__()` rather than `_execute()`, because it doesn't
        iterate to completion.
        """
        if not keys:
            if not args:
                return self.next()
            elif len(args) == 1:
                return self.send(args[0])
            else:
                return self.send(args)
        else:
            if not args:
                return self.send(keys)
            else:
                return self.send((args, keys))


class QueuedGeneratorWrapper:
    """Call `task_done()` on a `Queue` when the generator terminates."""
    def __init__(self, generator, queue):
        self.__generator = generator
        self.__queue = queue

    def __call_gen(self, method, *args, **keys):
        if self.__generator is not None:
            try:
                return method(*args, **keys)
            except:
                self.__generator = None
                self.__queue.task_done()
                raise
        else: raise StopIteration()

    def __getattr__(self, attr):
        """Delegate to the generator."""
        if self.__generator is None:
            raise AttributeError("no generator in QueuedGeneratorWrapper")
        else:
            attr = getattr(self.__generator, attr)
            return partial(QueuedGeneratorWrapper.__call_gen, self, attr) if callable(attr) else attr


class ExecutionQueue(Queue):
    """A queue of Executors which are dequeued and executed in sequence.

    An instance of this class can be passed to the `@execute` decorator,
    followed by the executor factory and its arguments that will be used to
    execute the decorated generator function.  When the executors returned by
    the decorated generator functions are called, they will be queued for
    execution rather than executing immediately.
    """
    def __init__(self, *qargs, **qkeys):
        """Construct an execution queue.

        `qargs` and `qkeys` are passed to the underlying `Queue` object.
        """
        Queue.__init__(self, *qargs, **qkeys)
        self.__current_exec = None
        self.__exec_mutex = threading.Lock()

    def __call__(self, generator, exec_class, *exargs, **exkeys):
        """Create an executor and return a function which will queue it.

        Wrap the generator in an object which starts the next queued executor
        when it exits.
        """
        executor = exec_class(QueuedGeneratorWrapper(generator, self), *exargs, **exkeys)
        return lambda *args, **keys: self.put_nowait((executor, args, keys))

    def task_done(self):
        """Start up the next task in the queue as each one completes."""
        Queue.task_done(self)
        self.__next(isRunning=True)

    def put(self, item, *args, **kwargs):
        """Queue an executor, along with its arguments.

        Start executing the queue if this is the first entry."""
        Queue.put(self, item, *args, **kwargs)
        self.__next()

    def flush(self):
        """Empty the execution queue and then close the current executor.

        Calls `close()` on all executors in the queue (after calling `next()`
        to initialize them) as well as the running executor. Executors should
        clean up when `close()` is called, and the associated generators must
        catch the resulting `GeneratorExit` exception and clean up.
        """
        self.__exec_mutex.acquire()
        try:
            while True:
                # Empty the execution queue, closing all queued generators
                qexec, args, keys = self.get_nowait()
                qexec.next()
                qexec.close()
        except Empty:
            # Close the current executor; the executor is responsible for cleanup.
            if self.__current_exec is not None:
                self.__current_exec.close()
                self.__current_exec = None
        finally:
            self.__exec_mutex.release()

    def executing(self):
        self.__exec_mutex.acquire()
        try:
            return self.__current_exec is not None
        finally:
            self.__exec_mutex.release()

    def __next(self, isRunning=False):
        """Dequeue and start the next executor.

        `isRunning` is checked against the current execution status before
        dequeuing and starting the next executor."""
        self.__exec_mutex.acquire()
        if isRunning != (self.__current_exec is None):
            try:
                self.__current_exec, args, keys = self.get_nowait()
            except Empty:
                self.__current_exec = None
                return
            finally:
                self.__exec_mutex.release()
            self.__current_exec(*args, **keys)
        else:
            self.__exec_mutex.release()


if __name__ == '__main__':
    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--workers', type='int', metavar='N', default=4,
            help='Number of workers to create [%default]')
    optparser.add_option('--loops', type='int', metavar='N', default=2,
            help='Number of times to iterate each worker [%default]')
    optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
            help='Timeout for event loop [%default sec]')
    optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
            help='Worker delay to simulate work [%default sec]')
    (options, args) = optparser.parse_args()

    printLock = threading.Lock()
    eventq = Queue()
    execq = ExecutionQueue()

    def printThread(name, action):
        printLock.acquire()
        print "%s loop %s in %s of %d threads" % (name, action,
            threading.currentThread().getName(), threading.activeCount())
        printLock.release()

    def loop(looptime=0.5):
        """A simple event queue loop."""
        while threading.activeCount() > 1 or not eventq.empty():
            try:
                next = eventq.get(timeout=looptime)
                printThread(" Event", "executing event")
                if callable(next): next()
            except Empty:
                printThread(" Event", "running")

    @execute(execq, ThreadExecutor, eventq.put, exc_handler=None)
    def work(wnum, loops=2, worktime=2.0):
        for count in range(loops):
            # Work performed in separate thread
            printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
            sleep(worktime)
            printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
            yield True
            # Work performed in event queue
            printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
            yield True

    # Create and queue the workers, and then loop the event queue
    for x in range(options.workers):
        work(x, loops=options.loops, worktime=options.worktime)(exc_handler=print_exc)
    loop(looptime=options.looptime)

Diff to Previous Revision

--- revision 13 2012-01-06 18:12:07
+++ revision 14 2012-12-06 19:32:20
@@ -27,7 +27,7 @@
     import optparse
     from time import sleep
 
-__version__ = '$Revision: 2377 $'.split()[1]
+__version__ = '$Revision: 2539 $'.split()[1]
 
 __usage__ = 'usage: %prog [options]'
 
@@ -64,16 +64,21 @@
     """A skeletal base class for executors.
 
     Delegates to the enclosed generator so as not to change the signature.
-    Subclasses must implement the `__call__()` method.
+    Subclasses must implement the `_execute()` method.
     """
     def __init__(self, generator, exc_handler=print_exc):
         """Construct an executor.
 
-        `exc_handler` will be called by `_handle_exit()` if the generator
-        raises an exception other than `StopIteration`. If `exc_handler` is
-        `None`, exceptions (including `StopIteration`) are reraised.
+        If `exc_handler` is not `None` and any method of the generator raises
+        an exception other than `StopIteration`, `exc_handler` will be called;
+        otherwise, exceptions (including `StopIteration`) are reraised to the
+        caller. Thus, an implementation of Executor can call the exception
+        handler by calling `self.throw(*sys.exc_info())`; the exception will
+        be handled by the generator, handled by the exception handler (if
+        any), or thrown back to the caller.
         """
         self.__generator = generator
+        self.__executing = False
         self.__exc_handler = exc_handler
         self.__throw = generator.throw
         self.__send = generator.send
@@ -94,21 +99,38 @@
         elif self.__exc_handler is None: raise StopIteration()
         else: return None
 
-    def override_exc_handler(self, exc_handler=None):
-        """Override the default exception handler specified at construction.
-
-        `exc_handler`, if not None, overrides the value passed to `__init__()`.
-        """
-        if exc_handler is not None: self.__exc_handler = exc_handler
-
-    def __call__(self, exc_handler=None):
+    def __call__(self, *args, **keys):
+        """Start the executor. May only be called once per executor.
+
+        Returns the excutor, as a convenience for chained calls.
+
+        Subclasses should not override this method, but should implement the
+        `_execute()` method to iterate the generator to completion, changing
+        contexts as appropriate at each `yield` statement.
+
+        If the `exc_handler` key is present, it will be removed and will
+        override the default exception handler.
+        """
+        if not self.__executing:
+            self.__executing = True
+            exc_handler = keys.pop('exc_handler', None)
+            if exc_handler is not None: self.__exc_handler = exc_handler
+            try:
+                self._execute(*args, **keys)
+            except:
+                self.throw(*exc_info())
+        else:
+            raise ValueError("executor already executing." if self.__generator else "executor already complete.")
+        return self
+
+    def _execute(self):
         """Start the executor.
 
-        Subclasses must implement the `__call__()` method to iterate the
+        Subclasses must implement the `_execute()` method to iterate the
         generator to completion, changing contexts as appropriate at each
         `yield` statement.
         """
-        raise NotImplementedError, "__call__() must be implemented in subclass"
+        raise NotImplementedError, "_execute() must be implemented in subclass"
 
     def _handle_exit(self, isStop=False):
         """Handle a generator exit.
@@ -164,21 +186,19 @@
         self.__synchronizer = synchronizer
         Executor.__init__(self, generator, exc_handler)
 
-    def __call__(self, exc_handler=None):
+    def _execute(self):
         """Check for exit, iterate once in a separate thread, and call `__finish()`."""
-        self.override_exc_handler(exc_handler)
         if not self.stopped():
             try:
                 threading.Thread(target=lambda:(self.next(), self.__finish())).start()
             except:
                 self.throw(*exc_info())
-        return self
 
     def __finish(self):
         """Check for exit, iterate once in the synchronous context, and call `self()`."""
         if not self.stopped():
             try:
-                self.__synchronizer(lambda:(self.next(), self()))
+                self.__synchronizer(lambda:(self.next(), self._execute()))
             except:
                 self.throw(*exc_info())
 
@@ -205,6 +225,9 @@
 
         Packages the parameters in the most reasonable fashion, calls
         `next()` or `send()`, and returns the yielded value.
+
+        Overrides `__call__()` rather than `_execute()`, because it doesn't
+        iterate to completion.
         """
         if not keys:
             if not args:
@@ -277,9 +300,39 @@
         Queue.task_done(self)
         self.__next(isRunning=True)
 
+    def put(self, item, *args, **kwargs):
+        """Queue an executor, along with its arguments.
+
+        Start executing the queue if this is the first entry."""
+        Queue.put(self, item, *args, **kwargs)
+        self.__next()
+
+    def flush(self):
+        """Empty the execution queue and then close the current executor.
+
+        Calls `close()` on all executors in the queue (after calling `next()`
+        to initialize them) as well as the running executor. Executors should
+        clean up when `close()` is called, and the associated generators must
+        catch the resulting `GeneratorExit` exception and clean up.
+        """
+        self.__exec_mutex.acquire()
+        try:
+            while True:
+                # Empty the execution queue, closing all queued generators
+                qexec, args, keys = self.get_nowait()
+                qexec.next()
+                qexec.close()
+        except Empty:
+            # Close the current executor; the executor is responsible for cleanup.
+            if self.__current_exec is not None:
+                self.__current_exec.close()
+                self.__current_exec = None
+        finally:
+            self.__exec_mutex.release()
+
     def executing(self):
+        self.__exec_mutex.acquire()
         try:
-            self.__exec_mutex.acquire()
             return self.__current_exec is not None
         finally:
             self.__exec_mutex.release()
@@ -292,7 +345,7 @@
         self.__exec_mutex.acquire()
         if isRunning != (self.__current_exec is None):
             try:
-                (self.__current_exec, args, keys) = self.get_nowait()
+                self.__current_exec, args, keys = self.get_nowait()
             except Empty:
                 self.__current_exec = None
                 return
@@ -302,25 +355,18 @@
         else:
             self.__exec_mutex.release()
 
-    def put(self, item, *args, **kwargs):
-        """Queue an executor, along with its arguments.
-
-        Start executing the queue if this is the first entry."""
-        Queue.put(self, item, *args, **kwargs)
-        self.__next()
-
 
 if __name__ == '__main__':
     optparser = optparse.OptionParser(usage=__usage__, version=__version__)
     optparser.disable_interspersed_args()
     optparser.add_option('--workers', type='int', metavar='N', default=4,
-            help='Number of workers to create (default %default)')
+            help='Number of workers to create [%default]')
     optparser.add_option('--loops', type='int', metavar='N', default=2,
-            help='Number of times to iterate each worker (default %default)')
+            help='Number of times to iterate each worker [%default]')
     optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
-            help='Timeout for event loop (default %default sec.)')
+            help='Timeout for event loop [%default sec]')
     optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
-            help='Worker delay to simulate work (default %default sec.)')
+            help='Worker delay to simulate work [%default sec]')
     (options, args) = optparser.parse_args()
 
     printLock = threading.Lock()

History