The observer pattern is implemented using an observable descriptor. One creates instances of observable in a class, which allows observers to subscribe to changes in the observable. Assigning a value to the observable causes the suscribers to be notified. An observable can subscribe to another observable, in which case changes to the second propagate to subscribers of the first. The subscribe method returns a Subscription object. When this object is deleted or becomes unreferenced, the subscription is cancelled.
This version compatible with Python 3.0 Example uses unittest to help in understanding the functionality.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | import weakref
import sys
import threading # for lock
class RecursionError(RuntimeError):
pass
class _subscription:
"""A subscription is returned as a result of subscribing to an
observable. When the subscription object is finalized, the
subscription is cancelled. This class is used to facilitate
subscription cancellation."""
def __init__(self, subscriber, observed):
self.subscriber = subscriber
self.observed = weakref.ref(observed)
def __del__(self):
obsrvd = self.observed()
if (self.subscriber and obsrvd):
obsrvd._cancel(self.subscriber)
class _observed:
'''Half-hidden class. Only 'observable' should construct these.
Calls to subscribe, cancel get invoked through the observable.
observed objects reside in class instances containing observables.'''
def __init__(self):
self.subscribers = []
self._value = None
self._changeLock = threading.Lock()
def __call__(self):
"""returns the current value, the one last set"""
return self._value
def _notifySubscribers(self, value):
for (f,exceptionHdlr) in self._callbacks():
try:
f(value)
except Exception as ex:
if exceptionHdlr and not exceptionHdlr(ex):
raise # reraise if not handled
def setvalu(self, value):
"""Notify the subcribers only when the value changes."""
if self._value != value:
if self._changeLock.acquire(0): # non-blocking
self._value = value
try:
self._notifySubscribers(value)
finally:
self._changeLock.release()
else:
raise RecursionError("Attempted recursion into observable's set method.")
def subscribe(self, obsv, exceptionInfo = None):
observer = obsv.setvalu if isinstance(obsv, _observed) else obsv
ob_info =(observer, exceptionInfo)
self.subscribers.append(ob_info)
return _subscription(ob_info, self)
def _callbacks(self):
scribers = []
scribers.extend(self.subscribers)
return scribers
def _cancel(self, wref):
self.subscribers.remove(wref)
class Observable:
"""An observable implemented as a descriptor. Subscribe to an observable
via calling xxx.observable.subscribe(callback)"""
def __init__(self, nam):
self.xname = "__"+nam
self.observed = _observed
def __set__(self,inst, value ):
"""set gets the instances associated variable and calls
its setvalu method, which notifies subribers"""
if inst and not hasattr(inst, self.xname):
setattr(inst, self.xname, self.observed())
ow = getattr(inst, self.xname)
ow.setvalu(value)
def __get__(self, inst, klass):
"""get gets the instances associated variable returns it"""
if inst and not hasattr(inst, self.xname):
setattr(inst, self.xname, self.observed())
return getattr(inst, self.xname)
#-----------------------------------------------------------------------
# Example & Simple Test
#-----------------------------------------------------------------------
if __name__ == '__main__':
import unittest
class MyClass:
""" A Class containing the observables length and width"""
length = Observable('length') #argument string should match the
#variable name
width = Observable('width')
def __init__(self):
self.length.setvalu(0)
self.width.setvalu(0)
class MyObserver:
"""An observer class. The initializer is passed an instance
of 'myClass' and subscribes to length and width changes.
This observer also itself contains an observable, l2"""
l2 = Observable('l2')
def __init__(self, name, observedObj):
self.name = name
self.subscrptn1 = observedObj.length.subscribe(self.print_length)
self.subscrptn2 = observedObj.width.subscribe(self.print_width)
"""An observable can subscribe to an observable, in which case
a change will chain through both subscription lists.
Here l2's subscribers will be notified whenever observedObj.length
changes"""
self.subscrptn3 = observedObj.length.subscribe(self.l2)
self.w = self.l = None
def print_width(self, value):
print ("%s Observed Width"%self.name, value)
self.w = value
def print_length(self, value):
print ("%s Observed Length"%self.name, value)
self.l = value
def cancel(self):
"""Cancels the instances current subscriptions. Setting self.subscrptn1
to None removes the reference to the subscription object, causing it's
finalizer (__del__) method to be called."""
self.subscrptn1 = None
self.subscrptn2 = None
self.subscrptn3 = None
def PrintLen2(value):
print ("PrintLen2 reports ", value)
if type(value) == type(3):
raise ValueError("PrintLen2 doesn't want ints.")
def handlePl2Exceptions( ex ):
print ('Handling pl2 exception:', ex, type(ex))
return ( type(ex)== ValueError )
class ObserverTestCases(unittest.TestCase):
area = MyClass()
kent = MyObserver("Kent", area)
billy = MyObserver("Billy", area)
# here we set up a chained observer. PrintLen2 function is called when
# billy.l2 changes. handlePl2Exceptions is exception handler for PrintLen2.
# if PrintLen2 throws an Exception, handlePl2Exceptions will be called to
# handle them.
subscription = billy.l2.subscribe(PrintLen2, handlePl2Exceptions)
def test_01(self):
self.area.length = 6
self.assertEqual(self.kent.l, 6)
self.assertEqual(self.billy.l, 6)
self.area.width = 4
self.assertEqual(self.kent.w, 4)
self.assertEqual(self.billy.w, 4)
self.area.length = "Reddish"
self.assertEqual(self.kent.l, "Reddish")
self.assertEqual(self.billy.l, "Reddish")
def test_02(self):
self.billy.subscrptn1 = None
print ("Billy shouldn't report a length change to 5.15.")
self.area.length = 5.15
self.assertEqual(self.kent.l, 5.15)
self.assertEqual(self.billy.l, "Reddish")
def test_03(self):
self.billy.cancel()
print ("Billy should no longer report")
self.area.length = 7
self.assertEqual(self.kent.l, 7)
self.assertEqual(self.billy.l, "Reddish")
self.area.width = 3
self.assertEqual(self.kent.w, 3)
self.assertEqual(self.billy.w, 4) # existing value
self.assertEqual(self.area.length(), 7)
self.assertEqual(self.area.width(), 3)
def test_04(self):
print ("Deleting an object with observables having subscribers is ok")
self.area = None
self.area = MyClass()
print ("replaced area - no subscribers to this new instance")
self.area.length = 5
self.assertEqual(self.kent.l,7)
self.assertEqual(self.billy.l, "Reddish")
self.area.width ="Bluish"
self.assertEqual(self.kent.w, 3)
self.assertEqual(self.billy.w, 4)
c = input("ok? ")
unittest.main()
|
In the example, MyClass contains two observables, length and width. An observer class is defined. The second argument to the initializer is an instance of MyClass. The initializer subscribes to changes in myClass's length and width values.
The observer class itself contains an observable, l2. And the observer's init method sets the l2 observable to be a subscriber to the length attribute of the myClass instance.
In the example two instances of the observer class are created and subscribe to changes of the length and width of the area object.
- ??-Dec-09 Modified to use weak reference in subscriber list.
- 11-Mar-10 Fix problems introduced with using weakref and simplify subscription cancellation. Improve variable names. Simplify examples.
- 12-Mar-10 Fix for when subscribers unsubsribe themselves during a notification and for when object containing observables is deleted while having subscribers
- 13-Mar-10 Improve exception handling, changes for use with threadsafe version, docstring improvements