Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""worker.py: A framework for executing inline code, contained in a generator,
across multiple execution contexts; for example, a generator which executes
some iterations synchronously and some asynchronously.

The generator to be executed yields for each change of context. A decorator
added to the generator associates an executor with it; the executor iterates
the generator and handles the context switching at each yield. The executor
and generator can communicate values through the yield statements in the
generator and the send() method in the executor.

References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""

import os
import sys

import threading
import traceback

from Queue import Queue, Empty

try:
    from functools import wraps
except ImportError:
    def wraps(wrapped):
        """A naive implementation of the wraps() decorator for Python <2.5"""
        def wrap(wrapper):
            wrapper.__name__ = wrapped.__name__
            wrapper.__doc__ = wrapped.__doc__
            wrapper.__module__ = wrapped.__module__
            wrapper.__dict__.update(wrapped.__dict__)
            return wrapper
        return wrap

if __name__ == '__main__':
    import optparse
    import time

__version__ = '$Revision: 1345 $'.split()[1]

__usage__ = 'usage: %prog [options]'


def execute(exec_factory, *exargs, **exkeys):
    """Decorator which associates a generator with a corresponding executor.
    Calling the decorated generator returns the executor; execution can
    then be started by calling a method on the executor (usually run()).

    The *exec_factory* argument is a callable factory (such as a class or
    function) which takes a generator and returns an executor.  The *exargs*
    and *exkeys* are additional arguments passed to the factory.
    """
    def exec_wrapper(generator):
        @wraps(generator)
        def work_factory(*genargs, **genkeys):
            work_iter = generator(*genargs, **genkeys)
            executor = exec_factory(work_iter, *exargs, **exkeys)
            return executor
        return work_factory
    return exec_wrapper


class Executor:
    """A skeletal base class for executors which delegates generator methods
    to the enclosed generator.  Not required, but useful.

    Does not implement any context switching or asynchronous behavior; it is
    expected that subclasses will override the run() method as appropriate.
    """
    def __init__(self, generator):
        self.__generator = generator

    def __iter__(self): return self

    def __getattr__(self, attr):
        """Cheap delegation to the generator"""
        return getattr(self.__generator, attr)

    def stopped(self):
        """Checks whether our generator has been discarded."""
        return self.__generator is None

    def next(self, handleException=False):
        """Wrapper around the generator's next() method which discards the
        generator when a StopIteration or GeneratorExit is encountered;
        allows for testing of generator exits using the stopped() method.

        If the parameter *handleException* is True, exceptions in the
        generator are not reraised by this wrapper. Instead, for exceptions
        other than StopExecution and GeneratorExit, handle_exception() is
        called. If handle_exception() returns a value interpreted as False,
        the generator is discarded.
        """
        if not self.stopped():
            try:
                return self.__generator.next()
            except (StopIteration, GeneratorExit), si:
                self.__generator = None
                if not handleException: raise
                return None
            except Exception, e:
                if not handleException: raise
                elif not self.handle_exception(e): self.__generator = None
                return None
        elif not handleException: raise StopIteration()
        else: return None

    def send(self, value, handleException=False):
        """Wrapper around the generator's send() method which discards the
        generator when a StopIteration or GeneratorExit is encountered;
        allows for testing of generator exits using the stopped() method.

        If the parameter *handleException* is True, exceptions in the
        generator are not reraised by this wrapper. Instead, for exceptions
        other than StopExecution and GeneratorExit, handle_exception() is
        called. If handle_exception() returns a value interpreted as False,
        the generator is discarded.
        """
        if not self.stopped():
            try:
                return self.__generator.send(value)
            except (StopIteration, GeneratorExit), si:
                self.__generator = None
                if not handleException: raise
                return None
            except Exception, e:
                if not handleException: raise
                elif not self.handle_exception(e): self.__generator = None
                return None
        elif not handleException: raise StopIteration()
        else: return None

    def run(self):
        """The run() method starts the executor, which then iterates the
        generator to completion, changing contexts as appropriate at each
        yield.

        The default implementation contains no asynchronous execution or
        context changes; if the generator yields a value interpreted as False,
        stops iterating, or raises an exception, execution terminates.
        """
        while self.next(handleException=True) and not self.stopped(): pass

    def handle_exception(self, exception):
        """Handle an exception raised by the generator; if this returns a
        value interpreted as False, the generator is discarded. By default,
        just prints a traceback to stderr and returns False.
        """
        traceback.print_exc(file=sys.stderr)
        return False


class ThreadExecutor(Executor):
    """Executes alternate iterations of a generator asynchonously (in a
    separate thread) and synchronously (through a callable, which usually
    queues into an event queue). If the generator yields a value interpreted
    as False, stops iterating, or raises an exception, execution terminates.
    """
    def __init__(self, generator, synchronizer):
        """The *synchronizer* parameter is a callable which will cause a
        callable passed to it to execute in the synchronous context.  This is
        usually done by queueing the passed callable in some form of event
        dispatch queue.
        """
        self.__synchronizer = synchronizer
        Executor.__init__(self, generator)

    def run(self):
        """In a separate thread, iterate the generator and pass the yielded
        value to finish().
        """
        if not self.stopped():
            threading.Thread(target=lambda: self._finish(self.next(handleException=True))).start()

    def _finish(self, run_yielded):
        """If the asynchronous thread yielded a value interpreted as True,
        iterate the generator through the synchronizer callable. If the
        generator again yields a value interpreted as True, call run() to
        continue iterating.
        """
        if run_yielded and not self.stopped():
            self.__synchronizer(lambda: self.next(handleException=True) and self.run())


class ExecutionQueue(Queue):
    """Manages a queue of generators which are automatically dequeued and
    executed in sequence by an Executor. The Executor is restarted as needed
    when entries are queued.
    """
    def __init__(self, exec_class, *exargs, **exkeys):
        """The exec_class, exargs, and exkeys are passed directly to the
        @execute() decorator and used to execute the generators."""
        Queue.__init__(self)
        self.__executor = None
        self.__exec_mutex = threading.Lock()

        @execute(exec_class, *exargs, **exkeys)
        def exec_queue(self):
            """A proxy which forwards to the executor from the queued generators."""
            while 1:
                # Get the next entry in a threadsafe manner
                self.__exec_mutex.acquire()
                try:
                    nextgen = self.get_nowait()
                except Empty:
                    self.__executor = None
                    break
                finally:
                    self.__exec_mutex.release()
                # Iterate the next entry to completion
                try:
                    input = nextgen.next()
                    while 1:
                        output = yield input
                        input = nextgen.send(output)
                except StopIteration:
                    pass
                finally:
                    self.task_done()

        self.__class__.__exec_queue = exec_queue
        # setattr(self.__class__, 'exec_queue', exec_queue)

    def put(self, item, block=False, timeout=None):
        """Queue a generator, and start the executor if it is not running.
        Note that we change the default value for blocking to suit our needs."""
        self.__exec_mutex.acquire()
        Queue.put(self, item, block, timeout)
        if not self.__executor:
            self.__executor = self.__exec_queue()
            self.__executor.run()
        self.__exec_mutex.release()


if __name__ == '__main__':
    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--workers', type='int', metavar='N', default=4,
            help='Number of workers to create (default %default)')
    optparser.add_option('--loops', type='int', metavar='N', default=2,
            help='Number of times to iterate each worker (default %default)')
    optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
            help='Timeout for event loop (default %default sec.)')
    optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
            help='Worker delay to simulate work (default %default sec.)')
    (options, args) = optparser.parse_args()

    printLock = threading.Lock()
    eventq = Queue()
    execq = ExecutionQueue(ThreadExecutor, eventq.put)

    def printThread(name, action):
        printLock.acquire()
        print "%s loop %s in %s of %d threads" % (name, action,
            threading.currentThread().getName(), threading.activeCount())
        printLock.release()

    def loop(looptime=0.5):
        """A simple event queue loop."""
        while threading.activeCount() > 1 or not eventq.empty():
            try:
                next = eventq.get(timeout=looptime)
                printThread(" Event", "executing event")
                if callable(next): next()
            except Empty:
                printThread(" Event", "running")

    def work(wnum, loops=2, worktime=2.0):
        """An example generator which executes a simple loop."""
        for count in range(loops):
            # Work performed in separate thread
            printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
            time.sleep(worktime)
            printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
            yield True
            # Work performed in event queue
            printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
            yield True

    # Create and queue the workers, and then loop the event queue
    for x in range(options.workers):
        execq.put(work(x, loops=options.loops, worktime=options.worktime))
    loop(looptime=options.looptime)

Diff to Previous Revision

--- revision 6 2010-10-27 21:08:05
+++ revision 7 2010-12-23 15:29:07
@@ -19,6 +19,8 @@
 
 import threading
 import traceback
+
+from Queue import Queue, Empty
 
 try:
     from functools import wraps
@@ -35,7 +37,6 @@
 
 if __name__ == '__main__':
     import optparse
-    import Queue
     import time
 
 __version__ = '$Revision: 1345 $'.split()[1]
@@ -184,54 +185,102 @@
             self.__synchronizer(lambda: self.next(handleException=True) and self.run())
 
 
+class ExecutionQueue(Queue):
+    """Manages a queue of generators which are automatically dequeued and
+    executed in sequence by an Executor. The Executor is restarted as needed
+    when entries are queued.
+    """
+    def __init__(self, exec_class, *exargs, **exkeys):
+        """The exec_class, exargs, and exkeys are passed directly to the
+        @execute() decorator and used to execute the generators."""
+        Queue.__init__(self)
+        self.__executor = None
+        self.__exec_mutex = threading.Lock()
+
+        @execute(exec_class, *exargs, **exkeys)
+        def exec_queue(self):
+            """A proxy which forwards to the executor from the queued generators."""
+            while 1:
+                # Get the next entry in a threadsafe manner
+                self.__exec_mutex.acquire()
+                try:
+                    nextgen = self.get_nowait()
+                except Empty:
+                    self.__executor = None
+                    break
+                finally:
+                    self.__exec_mutex.release()
+                # Iterate the next entry to completion
+                try:
+                    input = nextgen.next()
+                    while 1:
+                        output = yield input
+                        input = nextgen.send(output)
+                except StopIteration:
+                    pass
+                finally:
+                    self.task_done()
+
+        self.__class__.__exec_queue = exec_queue
+        # setattr(self.__class__, 'exec_queue', exec_queue)
+
+    def put(self, item, block=False, timeout=None):
+        """Queue a generator, and start the executor if it is not running.
+        Note that we change the default value for blocking to suit our needs."""
+        self.__exec_mutex.acquire()
+        Queue.put(self, item, block, timeout)
+        if not self.__executor:
+            self.__executor = self.__exec_queue()
+            self.__executor.run()
+        self.__exec_mutex.release()
+
+
 if __name__ == '__main__':
-    printLock = threading.Lock()
-
-    def printThread(name, action, count):
-        printLock.acquire()
-        print "%s loop %d %s in %s of %d threads" % (name, count, action,
-            threading.currentThread().getName(), threading.activeCount())
-        printLock.release()
-
-    class TestEventQueue(Queue.Queue):
-        """An event queue which executes callable entries, looping as long
-        as there are more entries or active threads.
-        """
-        def loop(self, looptime=0.5):
-            while threading.activeCount() > 1 or not self.empty():
-                printThread(" Event", "running", 0)
-                try:
-                    next = self.get(timeout=looptime)
-                    if callable(next): next()
-                except Queue.Empty: pass
-
-    q = TestEventQueue()
-
-    @execute(ThreadExecutor, q.put)
-    def work(loops=5, worktime=2.0):
-        """An example generator/worker which executes a simple loop."""
-        count = 1
-        while count <= loops:
-            # Work performed in separate thread
-            printThread("Work", "starting", count)
-            time.sleep(worktime)
-            printThread("Work", "ending", count)
-            yield True
-            # Work performed in event queue
-            printThread("Work", "finishing", count)
-            count += 1
-            yield True
-
     optparser = optparse.OptionParser(usage=__usage__, version=__version__)
     optparser.disable_interspersed_args()
-    optparser.add_option('--loops', type='int', metavar='N', default=5,
-            help='Number of times to iterate worker (default %default)')
+    optparser.add_option('--workers', type='int', metavar='N', default=4,
+            help='Number of workers to create (default %default)')
+    optparser.add_option('--loops', type='int', metavar='N', default=2,
+            help='Number of times to iterate each worker (default %default)')
     optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
             help='Timeout for event loop (default %default sec.)')
     optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
             help='Worker delay to simulate work (default %default sec.)')
     (options, args) = optparser.parse_args()
 
-    # Create and queue the first worker, and then loop
-    q.put(work(loops=options.loops, worktime=options.worktime).run)
-    q.loop(looptime=options.looptime)
+    printLock = threading.Lock()
+    eventq = Queue()
+    execq = ExecutionQueue(ThreadExecutor, eventq.put)
+
+    def printThread(name, action):
+        printLock.acquire()
+        print "%s loop %s in %s of %d threads" % (name, action,
+            threading.currentThread().getName(), threading.activeCount())
+        printLock.release()
+
+    def loop(looptime=0.5):
+        """A simple event queue loop."""
+        while threading.activeCount() > 1 or not eventq.empty():
+            try:
+                next = eventq.get(timeout=looptime)
+                printThread(" Event", "executing event")
+                if callable(next): next()
+            except Empty:
+                printThread(" Event", "running")
+
+    def work(wnum, loops=2, worktime=2.0):
+        """An example generator which executes a simple loop."""
+        for count in range(loops):
+            # Work performed in separate thread
+            printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
+            time.sleep(worktime)
+            printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
+            yield True
+            # Work performed in event queue
+            printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
+            yield True
+
+    # Create and queue the workers, and then loop the event queue
+    for x in range(options.workers):
+        execq.put(work(x, loops=options.loops, worktime=options.worktime))
+    loop(looptime=options.looptime)

History