Welcome, guest | Sign In | My Account | Store | Cart

A framework for executing inline code, contained in a generator, across multiple execution contexts, by pairing it with an executor that handles the context switching at each yield. An example of a generator which executes some iterations synchronously and some asynchronously is provided. The framework is general enough to be applied to many different coroutine situations.

Python, 407 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python

"""worker.py: Executes inline code across multiple execution contexts.

The inline code to be executed is contained in a generator, which contains a
`yield` statement to signal each context change. A decorator added to the
generator function wraps the returned generator in the executor when the
generator function is called. Calling the returned executor iterates the
generator to completion, handling the context switching at each `yield`. The
executor and generator can communicate values through the `yield` statements
and `send()` method of the generator.

An example is provided of an executor which executes alternate iterations of a
generator asynchronously (in a `Thread`) and synchronously (in an event loop).

References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""

import threading
from sys import exc_info
from traceback import print_exc
from functools import partial, wraps
from Queue import Queue, Empty

if __name__ == '__main__':
    import optparse
    from time import sleep

__version__ = '$Revision: 2539 $'.split()[1]

__usage__ = 'usage: %prog [options]'


def execute(exec_factory, *exargs, **exkeys):
    """Wrap a returned generator in an executor.

    The returned executor can then be called to iterate the generator to
    completion. The executor should also implement the signature of the
    returned generator.

    The `exec_factory` argument is a callable (such as a class or function)
    which takes a generator as its first argument and returns a callable
    executor.

    Generator functions decorated with `execute` can be passed arguments in
    three different places:
    * in the decorator (`exargs` and `exkeys`); these arguments are passed to
        `exec_factory` when the executor is instantiated;
    * in the call to the wrapped generator function; these arguments are
        passed unchanged when instantiating the generator; and
    * in the call to the executor returned by the wrapped generator function.
    """
    def exec_wrapper(generator):
        @wraps(generator)
        def work_factory(*genargs, **genkeys):
            work_iter = generator(*genargs, **genkeys)
            return exec_factory(work_iter, *exargs, **exkeys)
        return work_factory
    return exec_wrapper


class Executor:
    """A skeletal base class for executors.

    Delegates to the enclosed generator so as not to change the signature.
    Subclasses must implement the `_execute()` method.
    """
    def __init__(self, generator, exc_handler=print_exc):
        """Construct an executor.

        If `exc_handler` is not `None` and any method of the generator raises
        an exception other than `StopIteration`, `exc_handler` will be called;
        otherwise, exceptions (including `StopIteration`) are reraised to the
        caller. Thus, an implementation of Executor can call the exception
        handler by calling `self.throw(*sys.exc_info())`; the exception will
        be handled by the generator, handled by the exception handler (if
        any), or thrown back to the caller.
        """
        self.__generator = generator
        self.__executing = False
        self.__exc_handler = exc_handler
        self.__throw = generator.throw
        self.__send = generator.send
        self.__next = generator.next
        self.__close = generator.close

    def __iter__(self): return self

    def __call_gen(self, method, *args, **keys):
        """If the generator exits, discard it and call `_handle_exit()`."""
        if not self.stopped():
            try:
                return method(*args, **keys)
            except StopIteration:
                return self._handle_exit(True)
            except:
                return self._handle_exit(False)
        elif self.__exc_handler is None: raise StopIteration()
        else: return None

    def __call__(self, *args, **keys):
        """Start the executor. May only be called once per executor.

        Returns the excutor, as a convenience for chained calls.

        Subclasses should not override this method, but should implement the
        `_execute()` method to iterate the generator to completion, changing
        contexts as appropriate at each `yield` statement.

        If the `exc_handler` key is present, it will be removed and will
        override the default exception handler.
        """
        if not self.__executing:
            self.__executing = True
            exc_handler = keys.pop('exc_handler', None)
            if exc_handler is not None: self.__exc_handler = exc_handler
            try:
                self._execute(*args, **keys)
            except:
                self.throw(*exc_info())
        else:
            raise ValueError("executor already executing." if self.__generator else "executor already complete.")
        return self

    def _execute(self):
        """Start the executor.

        Subclasses must implement the `_execute()` method to iterate the
        generator to completion, changing contexts as appropriate at each
        `yield` statement.
        """
        raise NotImplementedError, "_execute() must be implemented in subclass"

    def _handle_exit(self, isStop=False):
        """Handle a generator exit.

        Discards the generator, so that generator exit can be checked by
        calling `stopped()`, rather than wrapping every call to the generator
        in a `try...except` clause. Calls `exc_handler` if the generator
        raises an exception other than `StopIteration`; reraises exceptions
        (including `StopIteration`) if `exc_handler` is None.

        `isStop` is `True` if the generator raised `StopIteration`.
        """
        self.__generator = None
        if not self.__exc_handler: raise
        elif not isStop: self.__exc_handler()
        return None

    def stopped(self):
        """Check whether the generator has exited."""
        return self.__generator is None

    def throw(self, *args, **keys):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__throw, *args, **keys)

    def close(self):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__close)

    def next(self):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__next)

    def send(self, value):
        """If the generator exits, discard it and call `_handle_exit()`."""
        return self.__call_gen(self.__send, value)


class ThreadExecutor(Executor):
    """Executes alternate iterations asynchonously and synchronously.

    Asynchronous iterations are executed in a separate thread; synchronous
    iterations are executed through a callable, which usually queues into an
    event queue.
    """
    def __init__(self, generator, synchronizer, exc_handler=print_exc):
        """Construct a threaded executor.

        `synchronizer` is a callable which executes a callable passed to it in
        the synchronous context, usually by queueing the passed callable in an
        event dispatch queue.
        """
        self.__synchronizer = synchronizer
        Executor.__init__(self, generator, exc_handler)

    def _execute(self):
        """Check for exit, iterate once in a separate thread, and call `__finish()`."""
        if not self.stopped():
            try:
                threading.Thread(target=lambda:(self.next(), self.__finish())).start()
            except:
                self.throw(*exc_info())

    def __finish(self):
        """Check for exit, iterate once in the synchronous context, and call `self()`."""
        if not self.stopped():
            try:
                self.__synchronizer(lambda:(self.next(), self._execute()))
            except:
                self.throw(*exc_info())


class GeneratorWrapper(Executor):
    """An executor which turns a generator into a callable."""
    def __init__(self, generator, iterate_once=False, exc_handler=None):
        """If `iterate_once` is `True`, the generator is iterated once (by
        calling `next()`) immediately after construction, in order to be able
        to pass the parameters of the first `__call__()` to the generator by
        calling `send()`. Any yielded value is discarded. Note that if the
        first `__call__()` passes no arguments or a single `None`, the first
        iteration will succeed, and the generator need not be iterated once to
        initialize it.

        Note also that the default exception handler for a wrapper is `None`,
        so that exceptions are raised to the caller of `__call__()`.
        """
        Executor.__init__(self, generator, exc_handler)
        if iterate_once: generator.next()

    def __call__(self, *args, **keys):
        """Iterate the generator.

        Packages the parameters in the most reasonable fashion, calls
        `next()` or `send()`, and returns the yielded value.

        Overrides `__call__()` rather than `_execute()`, because it doesn't
        iterate to completion.
        """
        if not keys:
            if not args:
                return self.next()
            elif len(args) == 1:
                return self.send(args[0])
            else:
                return self.send(args)
        else:
            if not args:
                return self.send(keys)
            else:
                return self.send((args, keys))


class QueuedGeneratorWrapper:
    """Call `task_done()` on a `Queue` when the generator terminates."""
    def __init__(self, generator, queue):
        self.__generator = generator
        self.__queue = queue

    def __call_gen(self, method, *args, **keys):
        if self.__generator is not None:
            try:
                return method(*args, **keys)
            except:
                self.__generator = None
                self.__queue.task_done()
                raise
        else: raise StopIteration()

    def __getattr__(self, attr):
        """Delegate to the generator."""
        if self.__generator is None:
            raise AttributeError("no generator in QueuedGeneratorWrapper")
        else:
            attr = getattr(self.__generator, attr)
            return partial(QueuedGeneratorWrapper.__call_gen, self, attr) if callable(attr) else attr


class ExecutionQueue(Queue):
    """A queue of Executors which are dequeued and executed in sequence.

    An instance of this class can be passed to the `@execute` decorator,
    followed by the executor factory and its arguments that will be used to
    execute the decorated generator function.  When the executors returned by
    the decorated generator functions are called, they will be queued for
    execution rather than executing immediately.
    """
    def __init__(self, *qargs, **qkeys):
        """Construct an execution queue.

        `qargs` and `qkeys` are passed to the underlying `Queue` object.
        """
        Queue.__init__(self, *qargs, **qkeys)
        self.__current_exec = None
        self.__exec_mutex = threading.Lock()

    def __call__(self, generator, exec_class, *exargs, **exkeys):
        """Create an executor and return a function which will queue it.

        Wrap the generator in an object which starts the next queued executor
        when it exits.
        """
        executor = exec_class(QueuedGeneratorWrapper(generator, self), *exargs, **exkeys)
        return lambda *args, **keys: self.put_nowait((executor, args, keys))

    def task_done(self):
        """Start up the next task in the queue as each one completes."""
        Queue.task_done(self)
        self.__next(isRunning=True)

    def put(self, item, *args, **kwargs):
        """Queue an executor, along with its arguments.

        Start executing the queue if this is the first entry."""
        Queue.put(self, item, *args, **kwargs)
        self.__next()

    def flush(self):
        """Empty the execution queue and then close the current executor.

        Calls `close()` on all executors in the queue (after calling `next()`
        to initialize them) as well as the running executor. Executors should
        clean up when `close()` is called, and the associated generators must
        catch the resulting `GeneratorExit` exception and clean up.
        """
        self.__exec_mutex.acquire()
        try:
            while True:
                # Empty the execution queue, closing all queued generators
                qexec, args, keys = self.get_nowait()
                qexec.next()
                qexec.close()
        except Empty:
            # Close the current executor; the executor is responsible for cleanup.
            if self.__current_exec is not None:
                self.__current_exec.close()
                self.__current_exec = None
        finally:
            self.__exec_mutex.release()

    def executing(self):
        self.__exec_mutex.acquire()
        try:
            return self.__current_exec is not None
        finally:
            self.__exec_mutex.release()

    def __next(self, isRunning=False):
        """Dequeue and start the next executor.

        `isRunning` is checked against the current execution status before
        dequeuing and starting the next executor."""
        self.__exec_mutex.acquire()
        if isRunning != (self.__current_exec is None):
            try:
                self.__current_exec, args, keys = self.get_nowait()
            except Empty:
                self.__current_exec = None
                return
            finally:
                self.__exec_mutex.release()
            self.__current_exec(*args, **keys)
        else:
            self.__exec_mutex.release()


if __name__ == '__main__':
    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--workers', type='int', metavar='N', default=4,
            help='Number of workers to create [%default]')
    optparser.add_option('--loops', type='int', metavar='N', default=2,
            help='Number of times to iterate each worker [%default]')
    optparser.add_option('--looptime', type='float', metavar='SECONDS', default=0.5,
            help='Timeout for event loop [%default sec]')
    optparser.add_option('--worktime', type='float', metavar='SECONDS', default=2.0,
            help='Worker delay to simulate work [%default sec]')
    (options, args) = optparser.parse_args()

    printLock = threading.Lock()
    eventq = Queue()
    execq = ExecutionQueue()

    def printThread(name, action):
        printLock.acquire()
        print "%s loop %s in %s of %d threads" % (name, action,
            threading.currentThread().getName(), threading.activeCount())
        printLock.release()

    def loop(looptime=0.5):
        """A simple event queue loop."""
        while threading.activeCount() > 1 or not eventq.empty():
            try:
                next = eventq.get(timeout=looptime)
                printThread(" Event", "executing event")
                if callable(next): next()
            except Empty:
                printThread(" Event", "running")

    @execute(execq, ThreadExecutor, eventq.put, exc_handler=None)
    def work(wnum, loops=2, worktime=2.0):
        for count in range(loops):
            # Work performed in separate thread
            printThread("Worker %d loop %d" % (wnum+1, count+1), "starting")
            sleep(worktime)
            printThread("Worker %d loop %d" % (wnum+1, count+1), "ending")
            yield True
            # Work performed in event queue
            printThread("Worker %d loop %d" % (wnum+1, count+1), "finishing")
            yield True

    # Create and queue the workers, and then loop the event queue
    for x in range(options.workers):
        work(x, loops=options.loops, worktime=options.worktime)(exc_handler=print_exc)
    loop(looptime=options.looptime)

While working on recipe 576957 (Asynchronous subprocesses using asyncore), I noticed recipe 576952 for running inline code asynchronously, and realized it could be generalized to support types of asynchronicity other than Threads as well as different patterns of asynchronicity.

Revision 14: Move the action of an Executor from __call__() to _execute() in order to better handle the calling of executor objects and remove the need to call the __call__() method of the superclasses, and add the flush() method to the ExecutionQueue to empty the queue without executing.