#!/usr/bin/env python
"""worker.py: A framework for executing inline code, contained in a generator,
across multiple execution contexts; for example, a generator which executes
some iterations synchronously and some asynchronously.
The generator to be executed yields for each change of context. A decorator
added to the generator associates an executor with it; the executor iterates
the generator and handles the context switching at each yield. The executor
and generator can communicate values through the yield statements in the
generator and the _send() method in the executor.
References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""
import os
import sys
import threading
import traceback
from Queue import Queue, Empty
try:
from functools import wraps
except ImportError:
def wraps(wrapped):
"""A naive implementation of the wraps() decorator for Python <2.5"""
def wrap(wrapper):
wrapper.__name__ = wrapped.__name__
wrapper.__doc__ = wrapped.__doc__
wrapper.__module__ = wrapped.__module__
wrapper.__dict__.update(wrapped.__dict__)
return wrapper
return wrap
if __name__ == '__main__':
import optparse
import time
__version__ = '$Revision: 1538 $'.split()[1]
__usage__ = 'usage: %prog [options]'
def execute(exec_factory, *exargs, **exkeys):
"""Decorator which associates a generator with a corresponding executor.
Calling the decorated generator starts the executor (which must be a
callable) and then returns it.
The *exec_factory* argument is a callable (such as a class or function)
which takes a generator as its first argument and returns a callable
executor. *exargs* and *exkeys* are passed to the factory callable.
A generator function decorated with this decorator gains the additional
(undeclared) keyword parameter *exc_handler*, which is intercepted and
passed to the executor factory rather than the generator function.
"""
def exec_wrapper(generator):
@wraps(generator)
def work_factory(*genargs, **genkeys):
if "exc_handler" in genkeys:
exkeys["exc_handler"] = genkeys.pop("exc_handler")
work_iter = generator(*genargs, **genkeys)
executor = exec_factory(work_iter, *exargs, **exkeys)()
return executor
return work_factory
return exec_wrapper
class Executor:
"""A skeletal base class for executors which delegates to the enclosed
generator. Not required, but useful.
Does not implement any context switching or asynchronous behavior; it is
expected that subclasses will reimplement the __call__() method as
appropriate.
"""
def __init__(self, generator, exc_handler=traceback.print_exc):
"""*exc_handler* is a callable which will be called by _handle_exit()
if the generator raises an exception other than StopIteration. If it
is None, exceptions (including StopIteration) are reraised.
"""
self.__generator = generator
self.__exc_handler = exc_handler
def __iter__(self): return self
def __getattr__(self, attr):
"""Cheap delegation to the generator"""
return getattr(self.__generator, attr)
def __call__(self):
"""Starts the executor, which then iterates the generator to
completion, changing contexts as appropriate at each yield.
The default implementation contains no asynchronous execution or
context changes. Execution stops when the generator stops iterating.
"""
while not self.stopped(): self.next()
return self
def _handle_exit(self, isStop=False):
"""Discards the generator on exit, so that generator exit can be
checked by calling stopped() rather than wrapping everything in
try...except. Calls a handler (passed to __init__() in *exc_handler*)
if the generator raised an exception other than StopIteration;
reraises exceptions (including StopIteration) if the handler is None.
*isStop* is True if the generator raised StopIteration.
"""
self.__generator = None
if not self.__exc_handler: raise
elif not isStop: self.__exc_handler()
return None
def stopped(self):
"""Checks whether our generator has exited and been discarded."""
return self.__generator is None
def __call_gen(self, method, *args, **keys):
"""Wrapper around the generator's methods which calls _handle_exit()
when the generator exits or raises an exception."""
if not self.stopped():
try:
m = getattr(self.__generator, method)
return m(*args, **keys)
except StopIteration:
return self._handle_exit(True)
except:
return self._handle_exit(False)
elif self.__exc_handler is None: raise StopIteration()
else: return None
def throw(self, *args, **keys):
"""Wrapper around the generator's throw() method which calls
_handle_exit() when the generator exits or raises an exception."""
return self.__call_gen("throw", *args, **keys)
def close(self):
"""Wrapper around the generator's close() method which calls
_handle_exit() when the generator exits or raises an exception."""
return self.__call_gen("close")
def next(self):
"""Wrapper around the generator's next() method which calls
_handle_exit() when the generator exits or raises an exception."""
return self.__call_gen("next")
def send(self, value):
"""Wrapper around the generator's send() method which calls
_handle_exit() when the generator exits or raises an exception."""
return self.__call_gen("send", value)
class ThreadExecutor(Executor):
"""Executes alternate iterations of a generator asynchonously (in a
separate thread) and synchronously (through a callable, which usually
queues into an event queue).
"""
def __init__(self, generator, synchronizer, *args, **kwargs):
"""*synchronizer* is a callable which will cause a callable passed to
it to execute in the synchronous context. This is usually done by
queueing the passed callable in some form of event dispatch queue.
"""
self.__synchronizer = synchronizer
Executor.__init__(self, generator, *args, **kwargs)
def __call__(self):
"""If the generator has not stopped, iterate the generator in a
separate thread and then call __finish().
"""
if not self.stopped():
try:
threading.Thread(target=lambda:(self.next(), self.__finish())).start()
except:
self.throw(*sys.exc_info())
return self
def __finish(self):
"""If the generator has not stopped, iterate the generator through the
synchronizer, and call self() to continue iterating.
"""
if not self.stopped():
try:
self.__synchronizer(lambda:(self.next(), self()))
except:
self.throw(*sys.exc_info())
class QueuedGeneratorWrapper():
"""A wrapper around a generator which overrides the generator methods to
call task_done() on a Queue when the generator terminates."""
def __init__(self, generator, queue):
self.__generator = generator
self.__queue = queue
def __call_gen(self, method, *args, **keys):
if self.__generator is not None:
try:
m = getattr(self.__generator, method)
return m(*args, **keys)
except:
self.__generator = None
self.__queue.task_done()
raise
else: raise StopIteration()
def throw(self, *args, **keys):
return self.__call_gen("throw", *args, **keys)
def close(self):
return self.__call_gen("close")
def next(self):
return self.__call_gen("next")
def send(self, value):
return self.__call_gen("send", value)
class ExecutionQueue(Queue):
"""An executor factory that maintains a queue of Executors which are
automatically dequeued and executed in sequence.
An instance of this class can be passed to the @execute decorator,
followed by the Executor class (and its arguments) that will be used to
execute the decorated generator.
"""
def __init__(self, *qargs, **qkeys):
"""*qargs* and *qkeys* are passed to the underlying Queue object."""
Queue.__init__(self, *qargs, **qkeys)
self.__current_exec = None
self.__exec_mutex = threading.Lock()
def __call__(self, generator, exec_class, *exargs, **exkeys):
"""Create an executor, wrapping the generator in an object that starts
the next queued executor when this one exits."""
executor = exec_class(QueuedGeneratorWrapper(generator, self), *exargs, **exkeys)
return lambda: self.put_nowait(executor)
def task_done(self):
"""Start up the next task in the queue as each one completes."""
Queue.task_done(self)
self.__next(isRunning=True)
def __next(self, isRunning=False):
"""Dequeue and start the next executor.
*isRunning* is checked against the current execution status before
dequeuing and starting the next executor."""
self.__exec_mutex.acquire()
if isRunning != (self.__current_exec is None):
try:
self.__current_exec = self.get_nowait()
except Empty:
self.__current_exec = None
return
finally:
self.__exec_mutex.release()
self.__current_exec()
else:
self.__exec_mutex.release()
def put(self, item, *args, **kwargs):
"""Queue an executor, and start execution if it is not running."""
Queue.put(self, item, *args, **kwargs)
self.__next()
if __name__ == '__main__':
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--workers', type='int', metavar='N', default=4,
help='Number of workers to create (default %default)')
optparser.add_option('--loops', type='int', metavar='N', default=2,
help='Number of times to iterate each worker (default %default)')
optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
help='Timeout for event loop (default %default sec.)')
optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
help='Worker delay to simulate work (default %default sec.)')
(options, args) = optparser.parse_args()
printLock = threading.Lock()
eventq = Queue()
execq = ExecutionQueue()
def printThread(name, action):
printLock.acquire()
print "%s loop %s in %s of %d threads" % (name, action,
threading.currentThread().getName(), threading.activeCount())
printLock.release()
def loop(looptime=0.5):
"""A simple event queue loop."""
while threading.activeCount() > 1 or not eventq.empty():
try:
next = eventq.get(timeout=looptime)
printThread(" Event", "executing event")
if callable(next): next()
except Empty:
printThread(" Event", "running")
@execute(execq, ThreadExecutor, eventq.put)
def work(wnum, loops=2, worktime=2.0):
"""An example generator which executes a simple loop."""
for count in range(loops):
# Work performed in separate thread
printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
time.sleep(worktime)
printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
yield True
# Work performed in event queue
printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
yield True
# Create and queue the workers, and then loop the event queue
for x in range(options.workers):
work(x, loops=options.loops, worktime=options.worktime, exc_handler=traceback.print_exc)
loop(looptime=options.looptime)
Diff to Previous Revision
--- revision 10 2011-02-11 18:59:11
+++ revision 11 2011-02-14 20:53:11
@@ -39,7 +39,7 @@
import optparse
import time
-__version__ = '$Revision: 1521 $'.split()[1]
+__version__ = '$Revision: 1538 $'.split()[1]
__usage__ = 'usage: %prog [options]'
@@ -99,8 +99,9 @@
context changes. Execution stops when the generator stops iterating.
"""
while not self.stopped(): self.next()
-
- def __handle_exit(self, isStop=False):
+ return self
+
+ def _handle_exit(self, isStop=False):
"""Discards the generator on exit, so that generator exit can be
checked by calling stopped() rather than wrapping everything in
try...except. Calls a handler (passed to __init__() in *exc_handler*)
@@ -123,33 +124,34 @@
when the generator exits or raises an exception."""
if not self.stopped():
try:
- return method(*args, **keys)
+ m = getattr(self.__generator, method)
+ return m(*args, **keys)
except StopIteration:
- return self.__handle_exit(True)
+ return self._handle_exit(True)
except:
- return self.__handle_exit(False)
+ return self._handle_exit(False)
elif self.__exc_handler is None: raise StopIteration()
else: return None
def throw(self, *args, **keys):
"""Wrapper around the generator's throw() method which calls
_handle_exit() when the generator exits or raises an exception."""
- return self.__call_gen(self.__generator.throw, *args, **keys)
+ return self.__call_gen("throw", *args, **keys)
def close(self):
"""Wrapper around the generator's close() method which calls
_handle_exit() when the generator exits or raises an exception."""
- return self.__call_gen(self.__generator.close)
+ return self.__call_gen("close")
def next(self):
"""Wrapper around the generator's next() method which calls
_handle_exit() when the generator exits or raises an exception."""
- return self.__call_gen(self.__generator.next)
+ return self.__call_gen("next")
def send(self, value):
"""Wrapper around the generator's send() method which calls
_handle_exit() when the generator exits or raises an exception."""
- return self.__call_gen(self.__generator.send, value)
+ return self.__call_gen("send", value)
class ThreadExecutor(Executor):
@@ -174,6 +176,7 @@
threading.Thread(target=lambda:(self.next(), self.__finish())).start()
except:
self.throw(*sys.exc_info())
+ return self
def __finish(self):
"""If the generator has not stopped, iterate the generator through the
@@ -196,7 +199,8 @@
def __call_gen(self, method, *args, **keys):
if self.__generator is not None:
try:
- return method(*args, **keys)
+ m = getattr(self.__generator, method)
+ return m(*args, **keys)
except:
self.__generator = None
self.__queue.task_done()
@@ -204,16 +208,16 @@
else: raise StopIteration()
def throw(self, *args, **keys):
- return self.__call_gen(self.__generator.throw, *args, **keys)
+ return self.__call_gen("throw", *args, **keys)
def close(self):
- return self.__call_gen(self.__generator.close)
+ return self.__call_gen("close")
def next(self):
- return self.__call_gen(self.__generator.next)
+ return self.__call_gen("next")
def send(self, value):
- return self.__call_gen(self.__generator.send, value)
+ return self.__call_gen("send", value)
class ExecutionQueue(Queue):