#!/usr/bin/env python
"""worker.py: A framework for executing inline code, contained in a generator,
across multiple execution contexts; for example, a generator which executes
some iterations synchronously and some asynchronously.
The generator to be executed yields for each change of context. A decorator
added to the generator associates an executor with it; the executor iterates
the generator and handles the context switching at each yield. The executor
and generator can communicate values through the yield statements in the
generator and the send() method in the executor.
References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""
import os
import sys
import threading
import traceback
try:
from functools import wraps
except ImportError:
def wraps(wrapped):
"""A naive implementation of the wraps() decorator for Python <2.5"""
def wrap(wrapper):
wrapper.__name__ = wrapped.__name__
wrapper.__doc__ = wrapped.__doc__
wrapper.__module__ = wrapped.__module__
wrapper.__dict__.update(wrapped.__dict__)
return wrapper
return wrap
if __name__ == '__main__':
import optparse
import Queue
import time
__version__ = '$Revision: 1345 $'.split()[1]
__usage__ = 'usage: %prog [options]'
def execute(exec_factory, *exargs, **exkeys):
"""Decorator which associates a generator with a corresponding executor.
Calling the decorated generator returns the executor; execution can
then be started by calling a method on the executor (usually run()).
The *exec_factory* argument is a callable factory (such as a class or
function) which takes a generator and returns an executor. The *exargs*
and *exkeys* are additional arguments passed to the factory.
"""
def exec_wrapper(generator):
@wraps(generator)
def work_factory(*genargs, **genkeys):
work_iter = generator(*genargs, **genkeys)
executor = exec_factory(work_iter, *exargs, **exkeys)
return executor
return work_factory
return exec_wrapper
class Executor:
"""A skeletal base class for executors which delegates generator methods
to the enclosed generator. Not required, but useful.
Does not implement any context switching or asynchronous behavior; it is
expected that subclasses will override the run() method as appropriate.
"""
def __init__(self, generator):
self.__generator = generator
def __iter__(self): return self
def __getattr__(self, attr):
"""Cheap delegation to the generator"""
return getattr(self.__generator, attr)
def stopped(self):
"""Checks whether our generator has been discarded."""
return self.__generator is None
def next(self, handleException=False):
"""Wrapper around the generator's next() method which discards the
generator when a StopIteration or GeneratorExit is encountered;
allows for testing of generator exits using the stopped() method.
If the parameter *handleException* is True, exceptions in the
generator are not reraised by this wrapper. Instead, for exceptions
other than StopExecution and GeneratorExit, handle_exception() is
called. If handle_exception() returns a value interpreted as False,
the generator is discarded.
"""
if not self.stopped():
try:
return self.__generator.next()
except (StopIteration, GeneratorExit), si:
self.__generator = None
if not handleException: raise
return None
except Exception, e:
if not handleException: raise
elif not self.handle_exception(e): self.__generator = None
return None
elif not handleException: raise StopIteration()
else: return None
def send(self, value, handleException=False):
"""Wrapper around the generator's send() method which discards the
generator when a StopIteration or GeneratorExit is encountered;
allows for testing of generator exits using the stopped() method.
If the parameter *handleException* is True, exceptions in the
generator are not reraised by this wrapper. Instead, for exceptions
other than StopExecution and GeneratorExit, handle_exception() is
called. If handle_exception() returns a value interpreted as False,
the generator is discarded.
"""
if not self.stopped():
try:
return self.__generator.send(value)
except (StopIteration, GeneratorExit), si:
self.__generator = None
if not handleException: raise
return None
except Exception, e:
if not handleException: raise
elif not self.handle_exception(e): self.__generator = None
return None
elif not handleException: raise StopIteration()
else: return None
def run(self):
"""The run() method starts the executor, which then iterates the
generator to completion, changing contexts as appropriate at each
yield.
The default implementation contains no asynchronous execution or
context changes; if the generator yields a value interpreted as False,
stops iterating, or raises an exception, execution terminates.
"""
while self.next(handleException=True) and not self.stopped(): pass
def handle_exception(self, exception):
"""Handle an exception raised by the generator; if this returns a
value interpreted as False, the generator is discarded. By default,
just prints a traceback to stderr and returns False.
"""
traceback.print_exc(file=sys.stderr)
return False
class ThreadExecutor(Executor):
"""Executes alternate iterations of a generator asynchonously (in a
separate thread) and synchronously (through a callable, which usually
queues into an event queue). If the generator yields a value interpreted
as False, stops iterating, or raises an exception, execution terminates.
"""
def __init__(self, generator, synchronizer):
"""The *synchronizer* parameter is a callable which will cause a
callable passed to it to execute in the synchronous context. This is
usually done by queueing the passed callable in some form of event
dispatch queue.
"""
self.__synchronizer = synchronizer
Executor.__init__(self, generator)
def run(self):
"""In a separate thread, iterate the generator and pass the yielded
value to finish().
"""
if not self.stopped():
threading.Thread(target=lambda: self._finish(self.next(handleException=True))).start()
def _finish(self, run_yielded):
"""If the asynchronous thread yielded a value interpreted as True,
iterate the generator through the synchronizer callable. If the
generator again yields a value interpreted as True, call run() to
continue iterating.
"""
if run_yielded and not self.stopped():
self.__synchronizer(lambda: self.next(handleException=True) and self.run())
if __name__ == '__main__':
printLock = threading.Lock()
def printThread(name, action, count):
printLock.acquire()
print "%s loop %d %s in %s of %d threads" % (name, count, action,
threading.currentThread().getName(), threading.activeCount())
printLock.release()
class TestEventQueue(Queue.Queue):
"""An event queue which executes callable entries, looping as long
as there are more entries or active threads.
"""
def loop(self, looptime=0.5):
while threading.activeCount() > 1 or not self.empty():
printThread(" Event", "running", 0)
try:
next = self.get(timeout=looptime)
if callable(next): next()
except Queue.Empty: pass
q = TestEventQueue()
@execute(ThreadExecutor, q.put)
def work(loops=5, worktime=2.0):
"""An example generator/worker which executes a simple loop."""
count = 1
while count <= loops:
# Work performed in separate thread
printThread("Work", "starting", count)
time.sleep(worktime)
printThread("Work", "ending", count)
yield True
# Work performed in event queue
printThread("Work", "finishing", count)
count += 1
yield True
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--loops', type='int', metavar='N', default=5,
help='Number of times to iterate worker (default %default)')
optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
help='Timeout for event loop (default %default sec.)')
optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
help='Worker delay to simulate work (default %default sec.)')
(options, args) = optparser.parse_args()
# Create and queue the first worker, and then loop
q.put(work(loops=options.loops, worktime=options.worktime).run)
q.loop(looptime=options.looptime)
Diff to Previous Revision
--- revision 5 2010-10-18 20:54:23
+++ revision 6 2010-10-27 21:08:05
@@ -38,7 +38,7 @@
import Queue
import time
-__version__ = '$Revision: 1305 $'.split()[1]
+__version__ = '$Revision: 1345 $'.split()[1]
__usage__ = 'usage: %prog [options]'
@@ -63,45 +63,74 @@
class Executor:
- """A skeletal base class for executors. Not required, but useful.
+ """A skeletal base class for executors which delegates generator methods
+ to the enclosed generator. Not required, but useful.
Does not implement any context switching or asynchronous behavior; it is
expected that subclasses will override the run() method as appropriate.
"""
def __init__(self, generator):
- self._generator = generator
+ self.__generator = generator
def __iter__(self): return self
- def _next(self):
- """Convenience wrapper around the generator's next() method which
- discards the generator when a StopIteration or GeneratorExit is
- encountered; this allows for easier testing of generator exits. Also
- catches exceptions in the generator (to prevent them from bubbling out
- of the executor) and calls handle_exception()."""
- if self._generator:
+ def __getattr__(self, attr):
+ """Cheap delegation to the generator"""
+ return getattr(self.__generator, attr)
+
+ def stopped(self):
+ """Checks whether our generator has been discarded."""
+ return self.__generator is None
+
+ def next(self, handleException=False):
+ """Wrapper around the generator's next() method which discards the
+ generator when a StopIteration or GeneratorExit is encountered;
+ allows for testing of generator exits using the stopped() method.
+
+ If the parameter *handleException* is True, exceptions in the
+ generator are not reraised by this wrapper. Instead, for exceptions
+ other than StopExecution and GeneratorExit, handle_exception() is
+ called. If handle_exception() returns a value interpreted as False,
+ the generator is discarded.
+ """
+ if not self.stopped():
try:
- return self._generator.next()
+ return self.__generator.next()
except (StopIteration, GeneratorExit), si:
- self._generator = None
+ self.__generator = None
+ if not handleException: raise
+ return None
except Exception, e:
- if not self.handle_exception(e): self._generator = None
- return None
-
- def _send(self, value):
- """Convenience wrapper around the generator's send() method which
- discards the generator when a StopIteration or GeneratorExit is
- encountered; this allows for easier testing of generator exits. Also
- catches exceptions in the generator (to prevent them from bubbling out
- of the executor) and calls handle_exception()."""
- if self._generator:
+ if not handleException: raise
+ elif not self.handle_exception(e): self.__generator = None
+ return None
+ elif not handleException: raise StopIteration()
+ else: return None
+
+ def send(self, value, handleException=False):
+ """Wrapper around the generator's send() method which discards the
+ generator when a StopIteration or GeneratorExit is encountered;
+ allows for testing of generator exits using the stopped() method.
+
+ If the parameter *handleException* is True, exceptions in the
+ generator are not reraised by this wrapper. Instead, for exceptions
+ other than StopExecution and GeneratorExit, handle_exception() is
+ called. If handle_exception() returns a value interpreted as False,
+ the generator is discarded.
+ """
+ if not self.stopped():
try:
- return self._generator.send(value)
+ return self.__generator.send(value)
except (StopIteration, GeneratorExit), si:
- self._generator = None
+ self.__generator = None
+ if not handleException: raise
+ return None
except Exception, e:
- if not self.handle_exception(e): self._generator = None
- return None
+ if not handleException: raise
+ elif not self.handle_exception(e): self.__generator = None
+ return None
+ elif not handleException: raise StopIteration()
+ else: return None
def run(self):
"""The run() method starts the executor, which then iterates the
@@ -109,14 +138,16 @@
yield.
The default implementation contains no asynchronous execution or
- context changes. If the generator yields a value interpreted as False
- or raises an exception, execution terminates."""
- while (self._next() and self._generator): pass
+ context changes; if the generator yields a value interpreted as False,
+ stops iterating, or raises an exception, execution terminates.
+ """
+ while self.next(handleException=True) and not self.stopped(): pass
def handle_exception(self, exception):
"""Handle an exception raised by the generator; if this returns a
value interpreted as False, the generator is discarded. By default,
- just prints a traceback to stderr and returns False."""
+ just prints a traceback to stderr and returns False.
+ """
traceback.print_exc(file=sys.stderr)
return False
@@ -125,42 +156,50 @@
"""Executes alternate iterations of a generator asynchonously (in a
separate thread) and synchronously (through a callable, which usually
queues into an event queue). If the generator yields a value interpreted
- as False or raises an exception, execution terminates.
+ as False, stops iterating, or raises an exception, execution terminates.
"""
def __init__(self, generator, synchronizer):
"""The *synchronizer* parameter is a callable which will cause a
callable passed to it to execute in the synchronous context. This is
usually done by queueing the passed callable in some form of event
- dispatch queue."""
+ dispatch queue.
+ """
self.__synchronizer = synchronizer
Executor.__init__(self, generator)
def run(self):
"""In a separate thread, iterate the generator and pass the yielded
- value to finish()."""
- if self._generator:
- threading.Thread(target=lambda:self._finish(self._next())).start()
+ value to finish().
+ """
+ if not self.stopped():
+ threading.Thread(target=lambda: self._finish(self.next(handleException=True))).start()
def _finish(self, run_yielded):
"""If the asynchronous thread yielded a value interpreted as True,
iterate the generator through the synchronizer callable. If the
generator again yields a value interpreted as True, call run() to
- continue iterating."""
- if run_yielded and self._generator:
- self.__synchronizer(lambda: self._next() and self.run())
+ continue iterating.
+ """
+ if run_yielded and not self.stopped():
+ self.__synchronizer(lambda: self.next(handleException=True) and self.run())
if __name__ == '__main__':
+ printLock = threading.Lock()
+
def printThread(name, action, count):
+ printLock.acquire()
print "%s loop %d %s in %s of %d threads" % (name, count, action,
threading.currentThread().getName(), threading.activeCount())
+ printLock.release()
class TestEventQueue(Queue.Queue):
"""An event queue which executes callable entries, looping as long
- as there are more entries or active threads."""
+ as there are more entries or active threads.
+ """
def loop(self, looptime=0.5):
while threading.activeCount() > 1 or not self.empty():
- printThread("Event", "running", 0)
+ printThread(" Event", "running", 0)
try:
next = self.get(timeout=looptime)
if callable(next): next()
@@ -172,7 +211,7 @@
def work(loops=5, worktime=2.0):
"""An example generator/worker which executes a simple loop."""
count = 1
- while 1:
+ while count <= loops:
# Work performed in separate thread
printThread("Work", "starting", count)
time.sleep(worktime)
@@ -181,7 +220,7 @@
# Work performed in event queue
printThread("Work", "finishing", count)
count += 1
- yield count <= loops
+ yield True
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()