A simple, flexible, general-purpose observer pattern.
Observers can be callable objects or objects with a particular named method (handle_notify() by default). Events can be any object, and observers can select which events they are interested in receiving. Support for a number of different types of lightweight event objects is included.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | #!/usr/bin/env python
"""observer.py: A simple, flexible, general-purpose observer pattern.
Observers can be callable objects or objects with a particular named method
(`handle_notify()` by default). Events can be any object, and observers can
select which events they are interested in receiving. Support for a number
of different types of lightweight event objects is included.
"""
import sys
from threading import Lock
from traceback import print_exc
if __name__ == '__main__':
import optparse
__version__ = '$Revision: 2539 $'.split()[1]
__usage__ = 'usage: %prog [options] [test integer]'
class Observable:
"""Implements a simple observable object.
Optionally access the observer dictionary in a threadsafe manner. (This
does not guarantee the thread safety nor order of event notifications!)
An observer registers for events by calling the `obs_add()` method,
specifying the event criteria for which the observer wants to be notified.
An event passed to `_obs_notify()` is checked against the criteria for
each observer by calling the `_obs_check_event()` method.
An observer can be a callable object or an object with a particular named
method (`handle_notify()` by default); when called, the observer is passed
the observed object and event as arguments.
Observers must be hashable, as they are stored internally as keys in a
dictionary.
"""
def __init__(self, default_notify='handle_notify', threadsafe=False):
"""Construct an observable object.
The `default_notify` parameter sets the named method to be called
when sending an event to an observer.
"""
self._observers = {}
self._default_notify = default_notify;
if (threadsafe):
self.__obs_lock = Lock()
else:
self.__obs_lock = None
def _obs_check_observer(self, observer):
"""Validate an observer and convert it to a callable."""
if callable(observer):
return observer
elif hasattr(observer, self._default_notify):
return self._obs_check_observer(getattr(observer, self._default_notify))
else:
raise TypeError('Object is not a valid observer.')
def obs_add(self, observer, criteria=None):
"""Add an observer to this object.
`criteria` specifies the event criteria to listen for. The default
criteria of `None` listens for all events.
Observers are stored as strong references to avoid premature garbage
collection of anonymous observers.
"""
o_callable = self._obs_check_observer(observer)
if self.__obs_lock: self.__obs_lock.acquire()
try:
self._observers[observer] = [o_callable, criteria]
finally:
if self.__obs_lock: self.__obs_lock.release()
def obs_del(self, observer):
"""Remove an observer from this object."""
if self.__obs_lock: self.__obs_lock.acquire()
try:
del self._observers[observer]
finally:
if self.__obs_lock: self.__obs_lock.release()
# Lambdas for testing whether event matches criteria
_obs_event_tests = [
# Event or criteria are None
lambda a, b, o: (a is None) or (b is None),
# Event is equal to criteria
lambda a, b, o: a == b,
# Event and criteria are bit-field compatible
lambda a, b, o: (a & b) != 0,
# Event is a member of criteria
lambda a, b, o: a in b,
# If event is subscriptable but not string-like, check event[0] against criteria
# Try to make sure we don't recurse infinitely
lambda a, b, o: a != str(a) and a[0] != a \
and o._obs_check_event(a[0], b),
# If event is an instance, check event's class against criteria
# Try to make sure we don't recurse infinitely
lambda a, b, o: hasattr(a, '__class__') and a.__class__ != a \
and o._obs_check_event(a.__class__, b)
]
def _obs_check_event(self, event, criteria, tests=_obs_event_tests):
"""Check whether an event meets an observer's criteria.
The `tests` parameter is a list of callables that take three arguments
(the event, the criteria, and this object) and return `True` if the
event meets the criteria.
The default implementation supports the following event types and
criteria, and returns `True` if:
* either the event or criteria is None.
* the event and criteria support the '&' operation, and
`event & criteria` is not zero.
* the event is equal to the criteria or, if the criteria
is a container, the event is in the criteria.
* the `__class__` of the event meets the criteria as above
* if the event is a container, the first element meets the criteria
as above. This is useful for sending attribute change events as
tuples of `(<name>, <old value>, <new value>)`, and using a
sequence of `<name>` as criteria.
"""
for f in tests:
try:
if f(event, criteria, self): return True
except: pass
return False
def _obs_notify(self, event=None):
"""Notify observers of an event if the event meets their criteria.
If an observer raises an exception, the `_obs_exception()` method is
called and the observer is removed from the dictionary.
"""
if self.__obs_lock: self.__obs_lock.acquire()
try:
observers = self._observers.items()
finally:
if self.__obs_lock: self.__obs_lock.release()
for o, o_info in observers:
o_callable, o_criteria = o_info
if self._obs_check_event(event, o_criteria):
try:
o_callable(self, event)
except:
self._obs_exception()
self.obs_del(o)
def _obs_exception(self):
"""Handle an exception raised by an observer.
By default, just prints a traceback to `stderr`."""
print_exc(file=sys.stderr)
if __name__ == '__main__':
class TestObservable(Observable):
def run(self, maxbit, testbit=1):
self._obs_notify(event='start')
while testbit <= maxbit:
self._obs_notify(event=testbit)
self._obs_notify(event=(self, 'tested %d' % testbit))
testbit *= 2
self._obs_notify(event='stop')
class TestObserver:
def highest_bit(self, x):
k = 1
while (x & (x + 1)): x |= (x >> k); k *= 2
return (x + 1) / 2
def handle_notify(self, observed, event):
self.__notifications += 1
if isinstance(event, tuple): event = event[1]
print "Notification %d: %s" % (self.__notifications, event)
def run(self, testval, handler='func', events='bit'):
observable = TestObservable()
handlers = { 'func': self.handle_notify, 'obj': self }
criteria = { 'bit': testval, 'str': ['start','stop'],
'tuple': TestObservable, 'all': None }
observable.obs_add(handlers[handler], criteria[events])
self.__notifications = 0
observable.run(self.highest_bit(testval))
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--handler', type='choice', metavar='TYPE',
choices=['func','obj'], default='func',
help='Handler type to use: func or obj [%default]')
optparser.add_option('--events', type='choice', metavar='TYPE',
choices=['bit','str','tuple','all'], default='bit',
help='Event type to display: bit, str, tuple, or all [%default]')
(options, args) = optparser.parse_args()
if len(args) != 1:
optparser.print_help()
sys.exit(1)
else:
try:
TestObserver().run(int(args[0]), handler=options.handler, events=options.events)
except:
print_exc(file=sys.stderr)
optparser.print_help()
sys.exit(1)
|
In working on recipe 576957 (Asynchronous subprocesses using asyncore), I realized that a large chunk of the code was just an implementation of the observer pattern, and that the implementation could be generalized for reuse (even within that single recipe). Thus, I was motivated to extract and generalize the code and present it as a separate recipe.