A framework for executing inline code, contained in a generator, across multiple execution contexts, by pairing it with an executor that handles the context switching at each yield. An example of a generator which executes some iterations synchronously and some asynchronously is provided. The framework is general enough to be applied to many different coroutine situations.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 | #!/usr/bin/env python
"""worker.py: Executes inline code across multiple execution contexts.
The inline code to be executed is contained in a generator, which contains a
`yield` statement to signal each context change. A decorator added to the
generator function wraps the returned generator in the executor when the
generator function is called. Calling the returned executor iterates the
generator to completion, handling the context switching at each `yield`. The
executor and generator can communicate values through the `yield` statements
and `send()` method of the generator.
An example is provided of an executor which executes alternate iterations of a
generator asynchronously (in a `Thread`) and synchronously (in an event loop).
References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""
import threading
from sys import exc_info
from traceback import print_exc
from functools import partial, wraps
from Queue import Queue, Empty
if __name__ == '__main__':
import optparse
from time import sleep
__version__ = '$Revision: 2539 $'.split()[1]
__usage__ = 'usage: %prog [options]'
def execute(exec_factory, *exargs, **exkeys):
"""Wrap a returned generator in an executor.
The returned executor can then be called to iterate the generator to
completion. The executor should also implement the signature of the
returned generator.
The `exec_factory` argument is a callable (such as a class or function)
which takes a generator as its first argument and returns a callable
executor.
Generator functions decorated with `execute` can be passed arguments in
three different places:
* in the decorator (`exargs` and `exkeys`); these arguments are passed to
`exec_factory` when the executor is instantiated;
* in the call to the wrapped generator function; these arguments are
passed unchanged when instantiating the generator; and
* in the call to the executor returned by the wrapped generator function.
"""
def exec_wrapper(generator):
@wraps(generator)
def work_factory(*genargs, **genkeys):
work_iter = generator(*genargs, **genkeys)
return exec_factory(work_iter, *exargs, **exkeys)
return work_factory
return exec_wrapper
class Executor:
"""A skeletal base class for executors.
Delegates to the enclosed generator so as not to change the signature.
Subclasses must implement the `_execute()` method.
"""
def __init__(self, generator, exc_handler=print_exc):
"""Construct an executor.
If `exc_handler` is not `None` and any method of the generator raises
an exception other than `StopIteration`, `exc_handler` will be called;
otherwise, exceptions (including `StopIteration`) are reraised to the
caller. Thus, an implementation of Executor can call the exception
handler by calling `self.throw(*sys.exc_info())`; the exception will
be handled by the generator, handled by the exception handler (if
any), or thrown back to the caller.
"""
self.__generator = generator
self.__executing = False
self.__exc_handler = exc_handler
self.__throw = generator.throw
self.__send = generator.send
self.__next = generator.next
self.__close = generator.close
def __iter__(self): return self
def __call_gen(self, method, *args, **keys):
"""If the generator exits, discard it and call `_handle_exit()`."""
if not self.stopped():
try:
return method(*args, **keys)
except StopIteration:
return self._handle_exit(True)
except:
return self._handle_exit(False)
elif self.__exc_handler is None: raise StopIteration()
else: return None
def __call__(self, *args, **keys):
"""Start the executor. May only be called once per executor.
Returns the excutor, as a convenience for chained calls.
Subclasses should not override this method, but should implement the
`_execute()` method to iterate the generator to completion, changing
contexts as appropriate at each `yield` statement.
If the `exc_handler` key is present, it will be removed and will
override the default exception handler.
"""
if not self.__executing:
self.__executing = True
exc_handler = keys.pop('exc_handler', None)
if exc_handler is not None: self.__exc_handler = exc_handler
try:
self._execute(*args, **keys)
except:
self.throw(*exc_info())
else:
raise ValueError("executor already executing." if self.__generator else "executor already complete.")
return self
def _execute(self):
"""Start the executor.
Subclasses must implement the `_execute()` method to iterate the
generator to completion, changing contexts as appropriate at each
`yield` statement.
"""
raise NotImplementedError, "_execute() must be implemented in subclass"
def _handle_exit(self, isStop=False):
"""Handle a generator exit.
Discards the generator, so that generator exit can be checked by
calling `stopped()`, rather than wrapping every call to the generator
in a `try...except` clause. Calls `exc_handler` if the generator
raises an exception other than `StopIteration`; reraises exceptions
(including `StopIteration`) if `exc_handler` is None.
`isStop` is `True` if the generator raised `StopIteration`.
"""
self.__generator = None
if not self.__exc_handler: raise
elif not isStop: self.__exc_handler()
return None
def stopped(self):
"""Check whether the generator has exited."""
return self.__generator is None
def throw(self, *args, **keys):
"""If the generator exits, discard it and call `_handle_exit()`."""
return self.__call_gen(self.__throw, *args, **keys)
def close(self):
"""If the generator exits, discard it and call `_handle_exit()`."""
return self.__call_gen(self.__close)
def next(self):
"""If the generator exits, discard it and call `_handle_exit()`."""
return self.__call_gen(self.__next)
def send(self, value):
"""If the generator exits, discard it and call `_handle_exit()`."""
return self.__call_gen(self.__send, value)
class ThreadExecutor(Executor):
"""Executes alternate iterations asynchonously and synchronously.
Asynchronous iterations are executed in a separate thread; synchronous
iterations are executed through a callable, which usually queues into an
event queue.
"""
def __init__(self, generator, synchronizer, exc_handler=print_exc):
"""Construct a threaded executor.
`synchronizer` is a callable which executes a callable passed to it in
the synchronous context, usually by queueing the passed callable in an
event dispatch queue.
"""
self.__synchronizer = synchronizer
Executor.__init__(self, generator, exc_handler)
def _execute(self):
"""Check for exit, iterate once in a separate thread, and call `__finish()`."""
if not self.stopped():
try:
threading.Thread(target=lambda:(self.next(), self.__finish())).start()
except:
self.throw(*exc_info())
def __finish(self):
"""Check for exit, iterate once in the synchronous context, and call `self()`."""
if not self.stopped():
try:
self.__synchronizer(lambda:(self.next(), self._execute()))
except:
self.throw(*exc_info())
class GeneratorWrapper(Executor):
"""An executor which turns a generator into a callable."""
def __init__(self, generator, iterate_once=False, exc_handler=None):
"""If `iterate_once` is `True`, the generator is iterated once (by
calling `next()`) immediately after construction, in order to be able
to pass the parameters of the first `__call__()` to the generator by
calling `send()`. Any yielded value is discarded. Note that if the
first `__call__()` passes no arguments or a single `None`, the first
iteration will succeed, and the generator need not be iterated once to
initialize it.
Note also that the default exception handler for a wrapper is `None`,
so that exceptions are raised to the caller of `__call__()`.
"""
Executor.__init__(self, generator, exc_handler)
if iterate_once: generator.next()
def __call__(self, *args, **keys):
"""Iterate the generator.
Packages the parameters in the most reasonable fashion, calls
`next()` or `send()`, and returns the yielded value.
Overrides `__call__()` rather than `_execute()`, because it doesn't
iterate to completion.
"""
if not keys:
if not args:
return self.next()
elif len(args) == 1:
return self.send(args[0])
else:
return self.send(args)
else:
if not args:
return self.send(keys)
else:
return self.send((args, keys))
class QueuedGeneratorWrapper:
"""Call `task_done()` on a `Queue` when the generator terminates."""
def __init__(self, generator, queue):
self.__generator = generator
self.__queue = queue
def __call_gen(self, method, *args, **keys):
if self.__generator is not None:
try:
return method(*args, **keys)
except:
self.__generator = None
self.__queue.task_done()
raise
else: raise StopIteration()
def __getattr__(self, attr):
"""Delegate to the generator."""
if self.__generator is None:
raise AttributeError("no generator in QueuedGeneratorWrapper")
else:
attr = getattr(self.__generator, attr)
return partial(QueuedGeneratorWrapper.__call_gen, self, attr) if callable(attr) else attr
class ExecutionQueue(Queue):
"""A queue of Executors which are dequeued and executed in sequence.
An instance of this class can be passed to the `@execute` decorator,
followed by the executor factory and its arguments that will be used to
execute the decorated generator function. When the executors returned by
the decorated generator functions are called, they will be queued for
execution rather than executing immediately.
"""
def __init__(self, *qargs, **qkeys):
"""Construct an execution queue.
`qargs` and `qkeys` are passed to the underlying `Queue` object.
"""
Queue.__init__(self, *qargs, **qkeys)
self.__current_exec = None
self.__exec_mutex = threading.Lock()
def __call__(self, generator, exec_class, *exargs, **exkeys):
"""Create an executor and return a function which will queue it.
Wrap the generator in an object which starts the next queued executor
when it exits.
"""
executor = exec_class(QueuedGeneratorWrapper(generator, self), *exargs, **exkeys)
return lambda *args, **keys: self.put_nowait((executor, args, keys))
def task_done(self):
"""Start up the next task in the queue as each one completes."""
Queue.task_done(self)
self.__next(isRunning=True)
def put(self, item, *args, **kwargs):
"""Queue an executor, along with its arguments.
Start executing the queue if this is the first entry."""
Queue.put(self, item, *args, **kwargs)
self.__next()
def flush(self):
"""Empty the execution queue and then close the current executor.
Calls `close()` on all executors in the queue (after calling `next()`
to initialize them) as well as the running executor. Executors should
clean up when `close()` is called, and the associated generators must
catch the resulting `GeneratorExit` exception and clean up.
"""
self.__exec_mutex.acquire()
try:
while True:
# Empty the execution queue, closing all queued generators
qexec, args, keys = self.get_nowait()
qexec.next()
qexec.close()
except Empty:
# Close the current executor; the executor is responsible for cleanup.
if self.__current_exec is not None:
self.__current_exec.close()
self.__current_exec = None
finally:
self.__exec_mutex.release()
def executing(self):
self.__exec_mutex.acquire()
try:
return self.__current_exec is not None
finally:
self.__exec_mutex.release()
def __next(self, isRunning=False):
"""Dequeue and start the next executor.
`isRunning` is checked against the current execution status before
dequeuing and starting the next executor."""
self.__exec_mutex.acquire()
if isRunning != (self.__current_exec is None):
try:
self.__current_exec, args, keys = self.get_nowait()
except Empty:
self.__current_exec = None
return
finally:
self.__exec_mutex.release()
self.__current_exec(*args, **keys)
else:
self.__exec_mutex.release()
if __name__ == '__main__':
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--workers', type='int', metavar='N', default=4,
help='Number of workers to create [%default]')
optparser.add_option('--loops', type='int', metavar='N', default=2,
help='Number of times to iterate each worker [%default]')
optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
help='Timeout for event loop [%default sec]')
optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
help='Worker delay to simulate work [%default sec]')
(options, args) = optparser.parse_args()
printLock = threading.Lock()
eventq = Queue()
execq = ExecutionQueue()
def printThread(name, action):
printLock.acquire()
print "%s loop %s in %s of %d threads" % (name, action,
threading.currentThread().getName(), threading.activeCount())
printLock.release()
def loop(looptime=0.5):
"""A simple event queue loop."""
while threading.activeCount() > 1 or not eventq.empty():
try:
next = eventq.get(timeout=looptime)
printThread(" Event", "executing event")
if callable(next): next()
except Empty:
printThread(" Event", "running")
@execute(execq, ThreadExecutor, eventq.put, exc_handler=None)
def work(wnum, loops=2, worktime=2.0):
for count in range(loops):
# Work performed in separate thread
printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
sleep(worktime)
printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
yield True
# Work performed in event queue
printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
yield True
# Create and queue the workers, and then loop the event queue
for x in range(options.workers):
work(x, loops=options.loops, worktime=options.worktime)(exc_handler=print_exc)
loop(looptime=options.looptime)
|
While working on recipe 576957 (Asynchronous subprocesses using asyncore), I noticed recipe 576952 for running inline code asynchronously, and realized it could be generalized to support types of asynchronicity other than Threads as well as different patterns of asynchronicity.
Revision 14: Move the action of an Executor from __call__() to _execute() in order to better handle the calling of executor objects and remove the need to call the __call__() method of the superclasses, and add the flush() method to the ExecutionQueue to empty the queue without executing.