Based on the recipe for active objects given in "Concepts, Techniques, and Models of Computer Programming", by Peter van Roy and Seif Haridi, the ActiveObject class wraps an instance of a passive object and forwards messages to this object via a thread-safe message queue. The passive object processes the messages on its own thread, and returns the results to the caller via an AsynchResult object that can be used to block whilst waiting for a result, or to register callbacks to be called when a result is available.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | """
An ActiveObject forward messages to an internal passive object
running on its own thread.
The passive object processes these messages sequentially, and returns the results
or any exceptions to the caller via an AsyncResult object.
"""
from threading import Thread, Event, RLock
from Queue import Queue
class AsyncResult:
"""Represents an asynchronous operation that may not have completed yet."""
def __init__(self):
self.completed = False
self.failed = False
self.__wait = Event()
self.__callbacks = []
self.__errbacks = []
self.__retval = None
self.__error = None
self.__lock = RLock()
def complete(self):
self.__lock.acquire()
self.completed = True
self.__wait.set()
self.__lock.release()
def succeed(self, retval):
self.__retval = retval
self.complete()
for callback in self.__callbacks:
callback(retval)
self.clearCallbacks()
def fail(self, error):
self.__error = error
self.failed = True
self.complete()
for errback in self.__errbacks:
errback(error)
self.clearCallbacks()
def clearCallbacks(self):
self.__callbacks = []
self.__errbacks = []
def addCallback(self, callback, errback=None):
self.__lock.acquire()
try:
if self.completed:
if not self.failed:
callback(self.__retval)
else:
self.__callbacks.append(callback)
if not errback == None:
self.addErrback(errback)
finally:
self.__lock.release()
def addErrback(self, errback):
self.__lock.acquire()
try:
if self.completed:
if self.failed:
errback(self.__error)
else:
self.__errbacks.append(errback)
finally:
self.__lock.release()
def __getResult(self):
self.__wait.wait()
if not self.failed:
return self.__retval
else:
raise self.__error
result=property(__getResult)
class Message:
"""Represents a message forwarded to a passive object by an active object"""
def __init__(self, fun, queue):
self.fun = fun
self.queue = queue
def __call__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.result = AsyncResult()
self.queue.put(self)
return self.result
def call(self):
return self.fun(*(self.args), **(self.kwargs))
class ActiveObject:
"""An object that handles messages sequentially on a separate thread.
Call stop() to terminate the object's internal message loop."""
def __init__(self, klass, *args, **kwargs):
self.__obj = klass(*args, **kwargs)
self.__queue = Queue()
self.__thread = Thread(target=self.__processQueue)
self.__thread.start()
self.stopped = False
def stop(self):
self.__queue.put(StopIteration)
def __processQueue(self):
while True:
message = self.__queue.get()
retval = None
failure = None
if message==StopIteration:
self.stopped = True
break
try:
retval = message.call()
except Exception, e:
failure = e
if failure==None:
message.result.succeed(retval)
else:
message.result.fail(failure)
def __getattr__(self, attrname):
if self.stopped:
raise AttributeError("Object is no longer active.")
fun = getattr(self.__obj, attrname)
if hasattr(fun, '__call__'):
return Message(getattr(self.__obj, attrname), self.__queue)
else:
raise AttributeError("Active object does not support this function.")
|
Usage:
myActiveObject = ActiveObject(MyClass, args, *kwargs) # args and kwargs are passed to the passive object's __init__ function
asyncResult = myActiveObject.doFoo() while not asyncResult.completed: pass # do something else until the result is available print asyncResult.result
myActiveObject.doFoo().result # waits for a result and returns it when available
myActiveObject.doBar().addCallback(callback, errback) # calls callback if the operation succeeded, and errback if it failed.
myActiveObject.stop() # explicitly terminates the message loop used by the active object.
Active objects provide a simple approach to message-passing concurrency, where objects running in separate threads communicate with each other via message queues instead of using shared state with explicit locking to control concurrency.
A Windows form with a main window thread and a message loop may be seen as an example of an active object.
Active objects work better in languages with lightweight threads. Because this implementation assigns a new thread to each active object, it is not efficient to create a large number of objects A more efficient Python implementation would multiplex on a small number of threads in a thread pool, instead of assigning one thread to each active object.
See "Concepts, Techniques, and Models of Computer Programming", by Peter van Roy and Seif Haridi, for an in-depth treatment of active objects, message-passing concurrency and other models used for concurrent programming.
Callback Race Condition? Isn't it possible for an AsyncResult to complete before a client has had a chance to addCallback?
That aside, this recipe is clean and concise. Thanks for providing it!
Race condition. I take your point. We need to synchronize calls to complete(), addCallback() and addErrback():
Also. Now I come to test it, the error handling is incorrect also. __processQueue should read as follows:
and, also, callback(self.retval) and errback(self.error) should be callback(self.__retval) and errback(self.__error) in addCallback and addErrback.
Gnah. The lesson staring me in the face here is: write tests! Do it first!