Welcome, guest | Sign In | My Account | Store | Cart
NOTE: Recipes have moved! Please visit GitHub.com/activestate/code for the current versions.

Based on the recipe for active objects given in "Concepts, Techniques, and Models of Computer Programming", by Peter van Roy and Seif Haridi, the ActiveObject class wraps an instance of a passive object and forwards messages to this object via a thread-safe message queue. The passive object processes the messages on its own thread, and returns the results to the caller via an AsynchResult object that can be used to block whilst waiting for a result, or to register callbacks to be called when a result is available.

Python, 135 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
An ActiveObject forward messages to an internal passive object
running on its own thread.

The passive object processes these messages sequentially, and returns the results
or any exceptions to the caller via an AsyncResult object.
"""

from threading import Thread, Event, RLock
from Queue import Queue

class AsyncResult:
    """Represents an asynchronous operation that may not have completed yet."""
    def __init__(self):
        self.completed = False
        self.failed = False
        self.__wait = Event()
        self.__callbacks = []
        self.__errbacks = []
        self.__retval = None
        self.__error = None
        self.__lock = RLock()

    def complete(self):
        self.__lock.acquire()
        self.completed = True
        self.__wait.set()
        self.__lock.release()

    def succeed(self, retval):
        self.__retval = retval
        self.complete()
        for callback in self.__callbacks:
            callback(retval)
        self.clearCallbacks()
        
    def fail(self, error):
        self.__error = error
        self.failed = True
        self.complete()
        for errback in self.__errbacks:
            errback(error)
        self.clearCallbacks()

    def clearCallbacks(self):
        self.__callbacks = []
        self.__errbacks = []

    def addCallback(self, callback, errback=None):
        self.__lock.acquire()
        try:
            if self.completed:
                if not self.failed:
                    callback(self.__retval)
            else:
                self.__callbacks.append(callback)
            if not errback == None:
                self.addErrback(errback)
        finally:
            self.__lock.release()

    def addErrback(self, errback):
        self.__lock.acquire()
        try:
            if self.completed:
                if self.failed:
                    errback(self.__error)
            else:
                self.__errbacks.append(errback)
        finally:
            self.__lock.release()
            
    def __getResult(self):
        self.__wait.wait()
        if not self.failed:
            return self.__retval
        else:
            raise self.__error
    result=property(__getResult)

       
class Message:
    """Represents a message forwarded to a passive object by an active object"""
    def __init__(self, fun, queue):
        self.fun = fun
        self.queue = queue
        
    def __call__(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        self.result = AsyncResult()
        self.queue.put(self)
        return self.result

    def call(self):
        return self.fun(*(self.args), **(self.kwargs))

class ActiveObject:
    """An object that handles messages sequentially on a separate thread.
    Call stop() to terminate the object's internal message loop."""
    def __init__(self, klass, *args, **kwargs):
        self.__obj = klass(*args, **kwargs)
        self.__queue = Queue()
        self.__thread = Thread(target=self.__processQueue)
        self.__thread.start()
        self.stopped = False

    def stop(self):
        self.__queue.put(StopIteration)
        
    def __processQueue(self):
        while True:
            message = self.__queue.get()
            retval = None
            failure = None
            if message==StopIteration:
                self.stopped = True
                break
            try:
                retval = message.call()
            except Exception, e:
                failure = e
            if failure==None:
                message.result.succeed(retval)
            else:
                message.result.fail(failure)
            
    def __getattr__(self, attrname):
        if self.stopped:
            raise AttributeError("Object is no longer active.")
        fun = getattr(self.__obj, attrname)
        if hasattr(fun, '__call__'):
            return Message(getattr(self.__obj, attrname), self.__queue)
        else:
            raise AttributeError("Active object does not support this function.")

Usage:

myActiveObject = ActiveObject(MyClass, args, *kwargs) # args and kwargs are passed to the passive object's __init__ function

asyncResult = myActiveObject.doFoo() while not asyncResult.completed: pass # do something else until the result is available print asyncResult.result

myActiveObject.doFoo().result # waits for a result and returns it when available

myActiveObject.doBar().addCallback(callback, errback) # calls callback if the operation succeeded, and errback if it failed.

myActiveObject.stop() # explicitly terminates the message loop used by the active object.

Active objects provide a simple approach to message-passing concurrency, where objects running in separate threads communicate with each other via message queues instead of using shared state with explicit locking to control concurrency.

A Windows form with a main window thread and a message loop may be seen as an example of an active object.

Active objects work better in languages with lightweight threads. Because this implementation assigns a new thread to each active object, it is not efficient to create a large number of objects A more efficient Python implementation would multiplex on a small number of threads in a thread pool, instead of assigning one thread to each active object.

See "Concepts, Techniques, and Models of Computer Programming", by Peter van Roy and Seif Haridi, for an in-depth treatment of active objects, message-passing concurrency and other models used for concurrent programming.

3 comments

Mitch Chapman 12 years, 8 months ago  # | flag

Callback Race Condition? Isn't it possible for an AsyncResult to complete before a client has had a chance to addCallback?

That aside, this recipe is clean and concise. Thanks for providing it!

Dominic Fox (author) 12 years, 8 months ago  # | flag

Race condition. I take your point. We need to synchronize calls to complete(), addCallback() and addErrback():

from threading import Thread, Event, RLock
from Queue import Queue

class AsyncResult:
    """Represents an asynchronous operation that may not have completed yet."""
    def __init__(self):
        self.completed = False
        self.failed = False
        self.__wait = Event()
        self.__callbacks = []
        self.__errbacks = []
        self.__retval = None
        self.__error = None
        self.__lock = RLock()

    def complete(self):
        self.__lock.acquire()
        self.completed = True
        self.__wait.set()
        self.__lock.release()

    def succeed(self, retval):
        self.__retval = retval
        self.complete()
        for callback in self.__callbacks:
            callback(retval)
        self.clearCallbacks()

    def fail(self, error):
        self.__error = error
        self.failed = True
        self.complete()
        for errback in self.__errbacks:
            errback(error)
        self.clearCallbacks()

    def clearCallbacks(self):
        self.__callbacks = []
        self.__errbacks = []

    def addCallback(self, callback, errback=None):
        self.__lock.acquire()
        try:
            if self.completed:
                if not self.failed:
                    callback(self.retval)
            else:
                self.__callbacks.append(callback)
            if not errback == None:
                self.addErrback(errback)
        finally:
            self.__lock.release()

    def addErrback(self, errback):
        self.__lock.acquire()
        try:
            if self.completed:
                if self.failed:
                    errback(self.error)
            else:
                self.__errbacks.append(errback)
        finally:
            self.__lock.release()

    def __getResult(self):
        self.__wait.wait()
        if not self.failed:
            return self.__retval
        else:
            raise self.__error
    result=property(__getResult)
Dominic Fox (author) 12 years, 8 months ago  # | flag

Also. Now I come to test it, the error handling is incorrect also. __processQueue should read as follows:

def __processQueue(self):
    while True:
        message = self.__queue.get()
        retval = None
        failure = None
        if message==StopIteration:
            self.stopped = True
            break
        try:
            retval = message.call()
        except Exception, e:
            failure = e
        if failure==None:
            message.result.succeed(retval)
        else:
            message.result.fail(failure)

and, also, callback(self.retval) and errback(self.error) should be callback(self.__retval) and errback(self.__error) in addCallback and addErrback.

Gnah. The lesson staring me in the face here is: write tests! Do it first!

Created by Dominic Fox on Tue, 1 Feb 2005 (PSF)
Python recipes (4591)
Dominic Fox's recipes (4)

Required Modules

Other Information and Tasks