#!/usr/bin/env python
"""worker.py: A framework for executing inline code, contained in a generator,
across multiple execution contexts; for example, a generator which executes
some iterations synchronously and some asynchronously.
The generator to be executed yields for each change of context. A decorator
added to the generator associates an executor with it; the executor iterates
the generator and handles the context switching at each yield. The executor
and generator can communicate values through the yield statements in the
generator and the send() method in the executor.
References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""
import os
import sys
import threading
import traceback
from Queue import Queue, Empty
try:
from functools import wraps
except ImportError:
def wraps(wrapped):
"""A naive implementation of the wraps() decorator for Python <2.5"""
def wrap(wrapper):
wrapper.__name__ = wrapped.__name__
wrapper.__doc__ = wrapped.__doc__
wrapper.__module__ = wrapped.__module__
wrapper.__dict__.update(wrapped.__dict__)
return wrapper
return wrap
if __name__ == '__main__':
import optparse
import time
__version__ = '$Revision: 1345 $'.split()[1]
__usage__ = 'usage: %prog [options]'
def execute(exec_factory, *exargs, **exkeys):
"""Decorator which associates a generator with a corresponding executor.
Calling the decorated generator returns the executor; execution can
then be started by calling a method on the executor (usually run()).
The *exec_factory* argument is a callable factory (such as a class or
function) which takes a generator and returns an executor. The *exargs*
and *exkeys* are additional arguments passed to the factory.
"""
def exec_wrapper(generator):
@wraps(generator)
def work_factory(*genargs, **genkeys):
work_iter = generator(*genargs, **genkeys)
executor = exec_factory(work_iter, *exargs, **exkeys)
return executor
return work_factory
return exec_wrapper
class Executor:
"""A skeletal base class for executors which delegates generator methods
to the enclosed generator. Not required, but useful.
Does not implement any context switching or asynchronous behavior; it is
expected that subclasses will override the run() method as appropriate.
"""
def __init__(self, generator):
self.__generator = generator
def __iter__(self): return self
def __getattr__(self, attr):
"""Cheap delegation to the generator"""
return getattr(self.__generator, attr)
def stopped(self):
"""Checks whether our generator has been discarded."""
return self.__generator is None
def next(self, handleException=False):
"""Wrapper around the generator's next() method which discards the
generator when a StopIteration or GeneratorExit is encountered;
allows for testing of generator exits using the stopped() method.
If the parameter *handleException* is True, exceptions in the
generator are not reraised by this wrapper. Instead, for exceptions
other than StopExecution and GeneratorExit, handle_exception() is
called. If handle_exception() returns a value interpreted as False,
the generator is discarded.
"""
if not self.stopped():
try:
return self.__generator.next()
except (StopIteration, GeneratorExit), si:
self.__generator = None
if not handleException: raise
return None
except Exception, e:
if not handleException: raise
elif not self.handle_exception(e): self.__generator = None
return None
elif not handleException: raise StopIteration()
else: return None
def send(self, value, handleException=False):
"""Wrapper around the generator's send() method which discards the
generator when a StopIteration or GeneratorExit is encountered;
allows for testing of generator exits using the stopped() method.
If the parameter *handleException* is True, exceptions in the
generator are not reraised by this wrapper. Instead, for exceptions
other than StopExecution and GeneratorExit, handle_exception() is
called. If handle_exception() returns a value interpreted as False,
the generator is discarded.
"""
if not self.stopped():
try:
return self.__generator.send(value)
except (StopIteration, GeneratorExit), si:
self.__generator = None
if not handleException: raise
return None
except Exception, e:
if not handleException: raise
elif not self.handle_exception(e): self.__generator = None
return None
elif not handleException: raise StopIteration()
else: return None
def run(self):
"""The run() method starts the executor, which then iterates the
generator to completion, changing contexts as appropriate at each
yield.
The default implementation contains no asynchronous execution or
context changes; if the generator yields a value interpreted as False,
stops iterating, or raises an exception, execution terminates.
"""
while self.next(handleException=True) and not self.stopped(): pass
def handle_exception(self, exception):
"""Handle an exception raised by the generator; if this returns a
value interpreted as False, the generator is discarded. By default,
just prints a traceback to stderr and returns False.
"""
traceback.print_exc(file=sys.stderr)
return False
class ThreadExecutor(Executor):
"""Executes alternate iterations of a generator asynchonously (in a
separate thread) and synchronously (through a callable, which usually
queues into an event queue). If the generator yields a value interpreted
as False, stops iterating, or raises an exception, execution terminates.
"""
def __init__(self, generator, synchronizer):
"""The *synchronizer* parameter is a callable which will cause a
callable passed to it to execute in the synchronous context. This is
usually done by queueing the passed callable in some form of event
dispatch queue.
"""
self.__synchronizer = synchronizer
Executor.__init__(self, generator)
def run(self):
"""In a separate thread, iterate the generator and pass the yielded
value to finish().
"""
if not self.stopped():
threading.Thread(target=lambda: self._finish(self.next(handleException=True))).start()
def _finish(self, run_yielded):
"""If the asynchronous thread yielded a value interpreted as True,
iterate the generator through the synchronizer callable. If the
generator again yields a value interpreted as True, call run() to
continue iterating.
"""
if run_yielded and not self.stopped():
self.__synchronizer(lambda: self.next(handleException=True) and self.run())
class ExecutionQueue(Queue):
"""Manages a queue of generators which are automatically dequeued and
executed in sequence by an Executor. The Executor is restarted as needed
when entries are queued.
"""
def __init__(self, exec_class, *exargs, **exkeys):
"""The exec_class, exargs, and exkeys are passed directly to the
@execute() decorator and used to execute the generators."""
Queue.__init__(self)
self.__executor = None
self.__exec_mutex = threading.Lock()
@execute(exec_class, *exargs, **exkeys)
def exec_queue(self):
"""A proxy which forwards to the executor from the queued generators."""
while 1:
# Get the next entry in a threadsafe manner
self.__exec_mutex.acquire()
try:
nextgen = self.get_nowait()
except Empty:
self.__executor = None
break
finally:
self.__exec_mutex.release()
# Iterate the next entry to completion
try:
input = nextgen.next()
while 1:
output = yield input
input = nextgen.send(output)
except StopIteration:
pass
finally:
self.task_done()
self.__class__.__exec_queue = exec_queue
# setattr(self.__class__, 'exec_queue', exec_queue)
def put(self, item, block=False, timeout=None):
"""Queue a generator, and start the executor if it is not running.
Note that we change the default value for blocking to suit our needs."""
self.__exec_mutex.acquire()
Queue.put(self, item, block, timeout)
if not self.__executor:
self.__executor = self.__exec_queue()
self.__executor.run()
self.__exec_mutex.release()
if __name__ == '__main__':
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--workers', type='int', metavar='N', default=4,
help='Number of workers to create (default %default)')
optparser.add_option('--loops', type='int', metavar='N', default=2,
help='Number of times to iterate each worker (default %default)')
optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
help='Timeout for event loop (default %default sec.)')
optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
help='Worker delay to simulate work (default %default sec.)')
(options, args) = optparser.parse_args()
printLock = threading.Lock()
eventq = Queue()
execq = ExecutionQueue(ThreadExecutor, eventq.put)
def printThread(name, action):
printLock.acquire()
print "%s loop %s in %s of %d threads" % (name, action,
threading.currentThread().getName(), threading.activeCount())
printLock.release()
def loop(looptime=0.5):
"""A simple event queue loop."""
while threading.activeCount() > 1 or not eventq.empty():
try:
next = eventq.get(timeout=looptime)
printThread(" Event", "executing event")
if callable(next): next()
except Empty:
printThread(" Event", "running")
def work(wnum, loops=2, worktime=2.0):
"""An example generator which executes a simple loop."""
for count in range(loops):
# Work performed in separate thread
printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
time.sleep(worktime)
printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
yield True
# Work performed in event queue
printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
yield True
# Create and queue the workers, and then loop the event queue
for x in range(options.workers):
execq.put(work(x, loops=options.loops, worktime=options.worktime))
loop(looptime=options.looptime)
Diff to Previous Revision
--- revision 6 2010-10-27 21:08:05
+++ revision 7 2010-12-23 15:29:07
@@ -19,6 +19,8 @@
import threading
import traceback
+
+from Queue import Queue, Empty
try:
from functools import wraps
@@ -35,7 +37,6 @@
if __name__ == '__main__':
import optparse
- import Queue
import time
__version__ = '$Revision: 1345 $'.split()[1]
@@ -184,54 +185,102 @@
self.__synchronizer(lambda: self.next(handleException=True) and self.run())
+class ExecutionQueue(Queue):
+ """Manages a queue of generators which are automatically dequeued and
+ executed in sequence by an Executor. The Executor is restarted as needed
+ when entries are queued.
+ """
+ def __init__(self, exec_class, *exargs, **exkeys):
+ """The exec_class, exargs, and exkeys are passed directly to the
+ @execute() decorator and used to execute the generators."""
+ Queue.__init__(self)
+ self.__executor = None
+ self.__exec_mutex = threading.Lock()
+
+ @execute(exec_class, *exargs, **exkeys)
+ def exec_queue(self):
+ """A proxy which forwards to the executor from the queued generators."""
+ while 1:
+ # Get the next entry in a threadsafe manner
+ self.__exec_mutex.acquire()
+ try:
+ nextgen = self.get_nowait()
+ except Empty:
+ self.__executor = None
+ break
+ finally:
+ self.__exec_mutex.release()
+ # Iterate the next entry to completion
+ try:
+ input = nextgen.next()
+ while 1:
+ output = yield input
+ input = nextgen.send(output)
+ except StopIteration:
+ pass
+ finally:
+ self.task_done()
+
+ self.__class__.__exec_queue = exec_queue
+ # setattr(self.__class__, 'exec_queue', exec_queue)
+
+ def put(self, item, block=False, timeout=None):
+ """Queue a generator, and start the executor if it is not running.
+ Note that we change the default value for blocking to suit our needs."""
+ self.__exec_mutex.acquire()
+ Queue.put(self, item, block, timeout)
+ if not self.__executor:
+ self.__executor = self.__exec_queue()
+ self.__executor.run()
+ self.__exec_mutex.release()
+
+
if __name__ == '__main__':
- printLock = threading.Lock()
-
- def printThread(name, action, count):
- printLock.acquire()
- print "%s loop %d %s in %s of %d threads" % (name, count, action,
- threading.currentThread().getName(), threading.activeCount())
- printLock.release()
-
- class TestEventQueue(Queue.Queue):
- """An event queue which executes callable entries, looping as long
- as there are more entries or active threads.
- """
- def loop(self, looptime=0.5):
- while threading.activeCount() > 1 or not self.empty():
- printThread(" Event", "running", 0)
- try:
- next = self.get(timeout=looptime)
- if callable(next): next()
- except Queue.Empty: pass
-
- q = TestEventQueue()
-
- @execute(ThreadExecutor, q.put)
- def work(loops=5, worktime=2.0):
- """An example generator/worker which executes a simple loop."""
- count = 1
- while count <= loops:
- # Work performed in separate thread
- printThread("Work", "starting", count)
- time.sleep(worktime)
- printThread("Work", "ending", count)
- yield True
- # Work performed in event queue
- printThread("Work", "finishing", count)
- count += 1
- yield True
-
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
- optparser.add_option('--loops', type='int', metavar='N', default=5,
- help='Number of times to iterate worker (default %default)')
+ optparser.add_option('--workers', type='int', metavar='N', default=4,
+ help='Number of workers to create (default %default)')
+ optparser.add_option('--loops', type='int', metavar='N', default=2,
+ help='Number of times to iterate each worker (default %default)')
optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
help='Timeout for event loop (default %default sec.)')
optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
help='Worker delay to simulate work (default %default sec.)')
(options, args) = optparser.parse_args()
- # Create and queue the first worker, and then loop
- q.put(work(loops=options.loops, worktime=options.worktime).run)
- q.loop(looptime=options.looptime)
+ printLock = threading.Lock()
+ eventq = Queue()
+ execq = ExecutionQueue(ThreadExecutor, eventq.put)
+
+ def printThread(name, action):
+ printLock.acquire()
+ print "%s loop %s in %s of %d threads" % (name, action,
+ threading.currentThread().getName(), threading.activeCount())
+ printLock.release()
+
+ def loop(looptime=0.5):
+ """A simple event queue loop."""
+ while threading.activeCount() > 1 or not eventq.empty():
+ try:
+ next = eventq.get(timeout=looptime)
+ printThread(" Event", "executing event")
+ if callable(next): next()
+ except Empty:
+ printThread(" Event", "running")
+
+ def work(wnum, loops=2, worktime=2.0):
+ """An example generator which executes a simple loop."""
+ for count in range(loops):
+ # Work performed in separate thread
+ printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
+ time.sleep(worktime)
+ printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
+ yield True
+ # Work performed in event queue
+ printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
+ yield True
+
+ # Create and queue the workers, and then loop the event queue
+ for x in range(options.workers):
+ execq.put(work(x, loops=options.loops, worktime=options.worktime))
+ loop(looptime=options.looptime)