Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""observer.py: A simple, flexible, general-purpose observer pattern.

Observers can be callable objects or objects with a particular named method
(`handle_notify()` by default). Events can be any object, and observers can
select which events they are interested in receiving. Support for a number
of different types of lightweight event objects is included.
"""

import sys
from threading import Lock
from traceback import print_exc

if __name__ == '__main__':
    import optparse

__version__ = '$Revision: 2539 $'.split()[1]

__usage__ = 'usage: %prog [options] [test integer]'


class Observable:
    """Implements a simple observable object.

    Optionally access the observer dictionary in a threadsafe manner. (This
    does not guarantee the thread safety nor order of event notifications!)

    An observer registers for events by calling the `obs_add()` method,
    specifying the event criteria for which the observer wants to be notified.
    An event passed to `_obs_notify()` is checked against the criteria for
    each observer by calling the `_obs_check_event()` method.

    An observer can be a callable object or an object with a particular named
    method (`handle_notify()` by default); when called, the observer is passed
    the observed object and event as arguments.

    Observers must be hashable, as they are stored internally as keys in a
    dictionary.
    """

    def __init__(self, default_notify='handle_notify', threadsafe=False):
        """Construct an observable object.

        The `default_notify` parameter sets the named method to be called
        when sending an event to an observer.
        """
        self._observers = {}
        self._default_notify = default_notify;
        if (threadsafe):
            self.__obs_lock = Lock()
        else:
            self.__obs_lock = None

    def _obs_check_observer(self, observer):
        """Validate an observer and convert it to a callable."""
        if callable(observer):
            return observer
        elif hasattr(observer, self._default_notify):
            return self._obs_check_observer(getattr(observer, self._default_notify))
        else:
            raise TypeError('Object is not a valid observer.')

    def obs_add(self, observer, criteria=None):
        """Add an observer to this object.

        `criteria` specifies the event criteria to listen for. The default
        criteria of `None` listens for all events.

        Observers are stored as strong references to avoid premature garbage
        collection of anonymous observers.
        """
        o_callable = self._obs_check_observer(observer)
        if self.__obs_lock: self.__obs_lock.acquire()
        try:
            self._observers[observer] = [o_callable, criteria]
        finally:
            if self.__obs_lock: self.__obs_lock.release()

    def obs_del(self, observer):
        """Remove an observer from this object."""
        if self.__obs_lock: self.__obs_lock.acquire()
        try:
            del self._observers[observer]
        finally:
            if self.__obs_lock: self.__obs_lock.release()

    # Lambdas for testing whether event matches criteria
    _obs_event_tests = [
        # Event or criteria are None
        lambda a, b, o: (a is None) or (b is None),
        # Event is equal to criteria
        lambda a, b, o: a == b,
        # Event and criteria are bit-field compatible
        lambda a, b, o: (a & b) != 0,
        # Event is a member of criteria
        lambda a, b, o: a in b,
        # If event is subscriptable but not string-like, check event[0] against criteria
        # Try to make sure we don't recurse infinitely
        lambda a, b, o: a != str(a) and a[0] != a \
                and o._obs_check_event(a[0], b),
        # If event is an instance, check event's class against criteria
        # Try to make sure we don't recurse infinitely
        lambda a, b, o: hasattr(a, '__class__') and a.__class__ != a \
                and o._obs_check_event(a.__class__, b)
    ]

    def _obs_check_event(self, event, criteria, tests=_obs_event_tests):
        """Check whether an event meets an observer's criteria.

        The `tests` parameter is a list of callables that take three arguments
        (the event, the criteria, and this object) and return `True` if the
        event meets the criteria.

        The default implementation supports the following event types and
        criteria, and returns `True` if:
        * either the event or criteria is None.
        * the event and criteria support the '&' operation, and
            `event & criteria` is not zero.
        * the event is equal to the criteria or, if the criteria
            is a container, the event is in the criteria.
        * the `__class__` of the event meets the criteria as above
        * if the event is a container, the first element meets the criteria
            as above. This is useful for sending attribute change events as
            tuples of `(<name>, <old value>, <new value>)`, and using a
            sequence of `<name>` as criteria.
        """
        for f in tests:
            try:
                if f(event, criteria, self): return True
            except: pass
        return False

    def _obs_notify(self, event=None):
        """Notify observers of an event if the event meets their criteria.

        If an observer raises an exception, the `_obs_exception()` method is
        called and the observer is removed from the dictionary.
        """
        if self.__obs_lock: self.__obs_lock.acquire()
        try:
            observers = self._observers.items()
        finally:
            if self.__obs_lock: self.__obs_lock.release()
        for o, o_info in observers:
            o_callable, o_criteria = o_info
            if self._obs_check_event(event, o_criteria):
                try:
                    o_callable(self, event)
                except:
                    self._obs_exception()
                    self.obs_del(o)

    def _obs_exception(self):
        """Handle an exception raised by an observer.

        By default, just prints a traceback to `stderr`."""
        print_exc(file=sys.stderr)


if __name__ == '__main__':
    class TestObservable(Observable):
        def run(self, maxbit, testbit=1):
            self._obs_notify(event='start')
            while testbit <= maxbit:
                self._obs_notify(event=testbit)
                self._obs_notify(event=(self, 'tested %d' % testbit))
                testbit *= 2
            self._obs_notify(event='stop')


    class TestObserver:
        def highest_bit(self, x):
            k = 1
            while (x & (x + 1)): x |= (x >> k); k *= 2
            return (x + 1) / 2

        def handle_notify(self, observed, event):
            self.__notifications += 1
            if isinstance(event, tuple): event = event[1]
            print "Notification %d: %s" % (self.__notifications, event)

        def run(self, testval, handler='func', events='bit'):
            observable = TestObservable()
            handlers = { 'func': self.handle_notify, 'obj': self }
            criteria = { 'bit': testval, 'str': ['start','stop'],
                    'tuple': TestObservable, 'all': None }
            observable.obs_add(handlers[handler], criteria[events])
            self.__notifications = 0
            observable.run(self.highest_bit(testval))


    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--handler', type='choice', metavar='TYPE',
            choices=['func','obj'], default='func',
            help='Handler type to use: func or obj [%default]')
    optparser.add_option('--events', type='choice', metavar='TYPE',
            choices=['bit','str','tuple','all'], default='bit',
            help='Event type to display: bit, str, tuple, or all [%default]')
    (options, args) = optparser.parse_args()

    if len(args) != 1:
        optparser.print_help()
        sys.exit(1)
    else:
        try:
            TestObserver().run(int(args[0]), handler=options.handler, events=options.events)
        except:
            print_exc(file=sys.stderr)
            optparser.print_help()
            sys.exit(1)

Diff to Previous Revision

--- revision 15 2012-01-06 17:58:23
+++ revision 16 2012-12-06 19:23:11
@@ -15,7 +15,7 @@
 if __name__ == '__main__':
     import optparse
 
-__version__ = '$Revision: 2368 $'.split()[1]
+__version__ = '$Revision: 2539 $'.split()[1]
 
 __usage__ = 'usage: %prog [options] [test integer]'
 
@@ -194,10 +194,10 @@
     optparser.disable_interspersed_args()
     optparser.add_option('--handler', type='choice', metavar='TYPE',
             choices=['func','obj'], default='func',
-            help='Handler type to use (func or obj, default %default)')
+            help='Handler type to use: func or obj [%default]')
     optparser.add_option('--events', type='choice', metavar='TYPE',
             choices=['bit','str','tuple','all'], default='bit',
-            help='Event type to display (bit, str, tuple, or all, default %default)')
+            help='Event type to display: bit, str, tuple, or all [%default]')
     (options, args) = optparser.parse_args()
 
     if len(args) != 1:
@@ -205,8 +205,7 @@
         sys.exit(1)
     else:
         try:
-            TestObserver().run(int(args[0]),
-                    handler=options.handler, events=options.events)
+            TestObserver().run(int(args[0]), handler=options.handler, events=options.events)
         except:
             print_exc(file=sys.stderr)
             optparser.print_help()

History