Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""worker.py: A framework for executing inline code, contained in a generator,
across multiple execution contexts; for example, a generator which executes
some iterations synchronously and some asynchronously.

The generator to be executed yields for each change of context. A decorator
added to the generator associates an executor with it; the executor iterates
the generator and handles the context switching at each yield. The executor
and generator can communicate values through the yield statements in the
generator and the send() method in the executor.

References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""

import os
import sys

import threading
import traceback

try:
    from functools import wraps
except ImportError:
    def wraps(wrapped):
        """A naive implementation of the wraps() decorator for Python <2.5"""
        def wrap(wrapper):
            wrapper.__name__ = wrapped.__name__
            wrapper.__doc__ = wrapped.__doc__
            wrapper.__module__ = wrapped.__module__
            wrapper.__dict__.update(wrapped.__dict__)
            return wrapper
        return wrap

if __name__ == '__main__':
    import optparse
    import Queue
    import time

__version__ = '$Revision: 1345 $'.split()[1]

__usage__ = 'usage: %prog [options]'


def execute(exec_factory, *exargs, **exkeys):
    """Decorator which associates a generator with a corresponding executor.
    Calling the decorated generator returns the executor; execution can
    then be started by calling a method on the executor (usually run()).

    The *exec_factory* argument is a callable factory (such as a class or
    function) which takes a generator and returns an executor.  The *exargs*
    and *exkeys* are additional arguments passed to the factory.
    """
    def exec_wrapper(generator):
        @wraps(generator)
        def work_factory(*genargs, **genkeys):
            work_iter = generator(*genargs, **genkeys)
            executor = exec_factory(work_iter, *exargs, **exkeys)
            return executor
        return work_factory
    return exec_wrapper


class Executor:
    """A skeletal base class for executors which delegates generator methods
    to the enclosed generator.  Not required, but useful.

    Does not implement any context switching or asynchronous behavior; it is
    expected that subclasses will override the run() method as appropriate.
    """
    def __init__(self, generator):
        self.__generator = generator

    def __iter__(self): return self

    def __getattr__(self, attr):
        """Cheap delegation to the generator"""
        return getattr(self.__generator, attr)

    def stopped(self):
        """Checks whether our generator has been discarded."""
        return self.__generator is None

    def next(self, handleException=False):
        """Wrapper around the generator's next() method which discards the
        generator when a StopIteration or GeneratorExit is encountered;
        allows for testing of generator exits using the stopped() method.

        If the parameter *handleException* is True, exceptions in the
        generator are not reraised by this wrapper. Instead, for exceptions
        other than StopExecution and GeneratorExit, handle_exception() is
        called. If handle_exception() returns a value interpreted as False,
        the generator is discarded.
        """
        if not self.stopped():
            try:
                return self.__generator.next()
            except (StopIteration, GeneratorExit), si:
                self.__generator = None
                if not handleException: raise
                return None
            except Exception, e:
                if not handleException: raise
                elif not self.handle_exception(e): self.__generator = None
                return None
        elif not handleException: raise StopIteration()
        else: return None

    def send(self, value, handleException=False):
        """Wrapper around the generator's send() method which discards the
        generator when a StopIteration or GeneratorExit is encountered;
        allows for testing of generator exits using the stopped() method.

        If the parameter *handleException* is True, exceptions in the
        generator are not reraised by this wrapper. Instead, for exceptions
        other than StopExecution and GeneratorExit, handle_exception() is
        called. If handle_exception() returns a value interpreted as False,
        the generator is discarded.
        """
        if not self.stopped():
            try:
                return self.__generator.send(value)
            except (StopIteration, GeneratorExit), si:
                self.__generator = None
                if not handleException: raise
                return None
            except Exception, e:
                if not handleException: raise
                elif not self.handle_exception(e): self.__generator = None
                return None
        elif not handleException: raise StopIteration()
        else: return None

    def run(self):
        """The run() method starts the executor, which then iterates the
        generator to completion, changing contexts as appropriate at each
        yield.

        The default implementation contains no asynchronous execution or
        context changes; if the generator yields a value interpreted as False,
        stops iterating, or raises an exception, execution terminates.
        """
        while self.next(handleException=True) and not self.stopped(): pass

    def handle_exception(self, exception):
        """Handle an exception raised by the generator; if this returns a
        value interpreted as False, the generator is discarded. By default,
        just prints a traceback to stderr and returns False.
        """
        traceback.print_exc(file=sys.stderr)
        return False


class ThreadExecutor(Executor):
    """Executes alternate iterations of a generator asynchonously (in a
    separate thread) and synchronously (through a callable, which usually
    queues into an event queue). If the generator yields a value interpreted
    as False, stops iterating, or raises an exception, execution terminates.
    """
    def __init__(self, generator, synchronizer):
        """The *synchronizer* parameter is a callable which will cause a
        callable passed to it to execute in the synchronous context.  This is
        usually done by queueing the passed callable in some form of event
        dispatch queue.
        """
        self.__synchronizer = synchronizer
        Executor.__init__(self, generator)

    def run(self):
        """In a separate thread, iterate the generator and pass the yielded
        value to finish().
        """
        if not self.stopped():
            threading.Thread(target=lambda: self._finish(self.next(handleException=True))).start()

    def _finish(self, run_yielded):
        """If the asynchronous thread yielded a value interpreted as True,
        iterate the generator through the synchronizer callable. If the
        generator again yields a value interpreted as True, call run() to
        continue iterating.
        """
        if run_yielded and not self.stopped():
            self.__synchronizer(lambda: self.next(handleException=True) and self.run())


if __name__ == '__main__':
    printLock = threading.Lock()

    def printThread(name, action, count):
        printLock.acquire()
        print "%s loop %d %s in %s of %d threads" % (name, count, action,
            threading.currentThread().getName(), threading.activeCount())
        printLock.release()

    class TestEventQueue(Queue.Queue):
        """An event queue which executes callable entries, looping as long
        as there are more entries or active threads.
        """
        def loop(self, looptime=0.5):
            while threading.activeCount() > 1 or not self.empty():
                printThread(" Event", "running", 0)
                try:
                    next = self.get(timeout=looptime)
                    if callable(next): next()
                except Queue.Empty: pass

    q = TestEventQueue()

    @execute(ThreadExecutor, q.put)
    def work(loops=5, worktime=2.0):
        """An example generator/worker which executes a simple loop."""
        count = 1
        while count <= loops:
            # Work performed in separate thread
            printThread("Work", "starting", count)
            time.sleep(worktime)
            printThread("Work", "ending", count)
            yield True
            # Work performed in event queue
            printThread("Work", "finishing", count)
            count += 1
            yield True

    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--loops', type='int', metavar='N', default=5,
            help='Number of times to iterate worker (default %default)')
    optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
            help='Timeout for event loop (default %default sec.)')
    optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
            help='Worker delay to simulate work (default %default sec.)')
    (options, args) = optparser.parse_args()

    # Create and queue the first worker, and then loop
    q.put(work(loops=options.loops, worktime=options.worktime).run)
    q.loop(looptime=options.looptime)

Diff to Previous Revision

--- revision 5 2010-10-18 20:54:23
+++ revision 6 2010-10-27 21:08:05
@@ -38,7 +38,7 @@
     import Queue
     import time
 
-__version__ = '$Revision: 1305 $'.split()[1]
+__version__ = '$Revision: 1345 $'.split()[1]
 
 __usage__ = 'usage: %prog [options]'
 
@@ -63,45 +63,74 @@
 
 
 class Executor:
-    """A skeletal base class for executors.  Not required, but useful.
+    """A skeletal base class for executors which delegates generator methods
+    to the enclosed generator.  Not required, but useful.
 
     Does not implement any context switching or asynchronous behavior; it is
     expected that subclasses will override the run() method as appropriate.
     """
     def __init__(self, generator):
-        self._generator = generator
+        self.__generator = generator
 
     def __iter__(self): return self
 
-    def _next(self):
-        """Convenience wrapper around the generator's next() method which
-        discards the generator when a StopIteration or GeneratorExit is
-        encountered; this allows for easier testing of generator exits. Also
-        catches exceptions in the generator (to prevent them from bubbling out
-        of the executor) and calls handle_exception()."""
-        if self._generator:
+    def __getattr__(self, attr):
+        """Cheap delegation to the generator"""
+        return getattr(self.__generator, attr)
+
+    def stopped(self):
+        """Checks whether our generator has been discarded."""
+        return self.__generator is None
+
+    def next(self, handleException=False):
+        """Wrapper around the generator's next() method which discards the
+        generator when a StopIteration or GeneratorExit is encountered;
+        allows for testing of generator exits using the stopped() method.
+
+        If the parameter *handleException* is True, exceptions in the
+        generator are not reraised by this wrapper. Instead, for exceptions
+        other than StopExecution and GeneratorExit, handle_exception() is
+        called. If handle_exception() returns a value interpreted as False,
+        the generator is discarded.
+        """
+        if not self.stopped():
             try:
-                return self._generator.next()
+                return self.__generator.next()
             except (StopIteration, GeneratorExit), si:
-                self._generator = None
+                self.__generator = None
+                if not handleException: raise
+                return None
             except Exception, e:
-                if not self.handle_exception(e): self._generator = None
-        return None
-
-    def _send(self, value):
-        """Convenience wrapper around the generator's send() method which
-        discards the generator when a StopIteration or GeneratorExit is
-        encountered; this allows for easier testing of generator exits. Also
-        catches exceptions in the generator (to prevent them from bubbling out
-        of the executor) and calls handle_exception()."""
-        if self._generator:
+                if not handleException: raise
+                elif not self.handle_exception(e): self.__generator = None
+                return None
+        elif not handleException: raise StopIteration()
+        else: return None
+
+    def send(self, value, handleException=False):
+        """Wrapper around the generator's send() method which discards the
+        generator when a StopIteration or GeneratorExit is encountered;
+        allows for testing of generator exits using the stopped() method.
+
+        If the parameter *handleException* is True, exceptions in the
+        generator are not reraised by this wrapper. Instead, for exceptions
+        other than StopExecution and GeneratorExit, handle_exception() is
+        called. If handle_exception() returns a value interpreted as False,
+        the generator is discarded.
+        """
+        if not self.stopped():
             try:
-                return self._generator.send(value)
+                return self.__generator.send(value)
             except (StopIteration, GeneratorExit), si:
-                self._generator = None
+                self.__generator = None
+                if not handleException: raise
+                return None
             except Exception, e:
-                if not self.handle_exception(e): self._generator = None
-        return None
+                if not handleException: raise
+                elif not self.handle_exception(e): self.__generator = None
+                return None
+        elif not handleException: raise StopIteration()
+        else: return None
 
     def run(self):
         """The run() method starts the executor, which then iterates the
@@ -109,14 +138,16 @@
         yield.
 
         The default implementation contains no asynchronous execution or
-        context changes. If the generator yields a value interpreted as False
-        or raises an exception, execution terminates."""
-        while (self._next() and self._generator): pass
+        context changes; if the generator yields a value interpreted as False,
+        stops iterating, or raises an exception, execution terminates.
+        """
+        while self.next(handleException=True) and not self.stopped(): pass
 
     def handle_exception(self, exception):
         """Handle an exception raised by the generator; if this returns a
         value interpreted as False, the generator is discarded. By default,
-        just prints a traceback to stderr and returns False."""
+        just prints a traceback to stderr and returns False.
+        """
         traceback.print_exc(file=sys.stderr)
         return False
 
@@ -125,42 +156,50 @@
     """Executes alternate iterations of a generator asynchonously (in a
     separate thread) and synchronously (through a callable, which usually
     queues into an event queue). If the generator yields a value interpreted
-    as False or raises an exception, execution terminates.
+    as False, stops iterating, or raises an exception, execution terminates.
     """
     def __init__(self, generator, synchronizer):
         """The *synchronizer* parameter is a callable which will cause a
         callable passed to it to execute in the synchronous context.  This is
         usually done by queueing the passed callable in some form of event
-        dispatch queue."""
+        dispatch queue.
+        """
         self.__synchronizer = synchronizer
         Executor.__init__(self, generator)
 
     def run(self):
         """In a separate thread, iterate the generator and pass the yielded
-        value to finish()."""
-        if self._generator:
-            threading.Thread(target=lambda:self._finish(self._next())).start()
+        value to finish().
+        """
+        if not self.stopped():
+            threading.Thread(target=lambda: self._finish(self.next(handleException=True))).start()
 
     def _finish(self, run_yielded):
         """If the asynchronous thread yielded a value interpreted as True,
         iterate the generator through the synchronizer callable. If the
         generator again yields a value interpreted as True, call run() to
-        continue iterating."""
-        if run_yielded and self._generator:
-            self.__synchronizer(lambda: self._next() and self.run())
+        continue iterating.
+        """
+        if run_yielded and not self.stopped():
+            self.__synchronizer(lambda: self.next(handleException=True) and self.run())
 
 
 if __name__ == '__main__':
+    printLock = threading.Lock()
+
     def printThread(name, action, count):
+        printLock.acquire()
         print "%s loop %d %s in %s of %d threads" % (name, count, action,
             threading.currentThread().getName(), threading.activeCount())
+        printLock.release()
 
     class TestEventQueue(Queue.Queue):
         """An event queue which executes callable entries, looping as long
-        as there are more entries or active threads."""
+        as there are more entries or active threads.
+        """
         def loop(self, looptime=0.5):
             while threading.activeCount() > 1 or not self.empty():
-                printThread("Event", "running", 0)
+                printThread(" Event", "running", 0)
                 try:
                     next = self.get(timeout=looptime)
                     if callable(next): next()
@@ -172,7 +211,7 @@
     def work(loops=5, worktime=2.0):
         """An example generator/worker which executes a simple loop."""
         count = 1
-        while 1:
+        while count <= loops:
             # Work performed in separate thread
             printThread("Work", "starting", count)
             time.sleep(worktime)
@@ -181,7 +220,7 @@
             # Work performed in event queue
             printThread("Work", "finishing", count)
             count += 1
-            yield count <= loops
+            yield True
 
     optparser = optparse.OptionParser(usage=__usage__, version=__version__)
     optparser.disable_interspersed_args()

History