Welcome, guest | Sign In | My Account | Store | Cart

The observer pattern is implemented using an observable descriptor. One creates instances of observable in a class, which allows observers to subscribe to changes in the observable. Assigning a value to the observable causes the suscribers to be notified. An observable can subscribe to another observable, in which case changes to the second propagate to subscribers of the first. The subscribe method returns a Subscription object. When this object is deleted or becomes unreferenced, the subscription is cancelled.

This version compatible with Python 3.0 Example uses unittest to help in understanding the functionality.

Python, 215 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import weakref
import sys
import threading   # for lock

class RecursionError(RuntimeError):
    pass

class _subscription:
    """A subscription is returned as a result of subscribing to an 
       observable. When the subscription object is finalized, the 
       subscription is cancelled.  This class is used to facilitate 
       subscription cancellation."""

    def __init__(self, subscriber, observed):
        self.subscriber = subscriber
        self.observed = weakref.ref(observed)

    def __del__(self):
        obsrvd = self.observed()
        if (self.subscriber and obsrvd):
            obsrvd._cancel(self.subscriber)


class _observed:
    '''Half-hidden class.  Only 'observable' should construct these.
    Calls to subscribe, cancel get invoked through the observable.
    observed objects reside in class instances containing observables.'''

    def __init__(self):
        self.subscribers = []
        self._value = None
        self._changeLock = threading.Lock()
        
    def __call__(self):
        """returns the current value, the one last set"""
        return self._value

    def _notifySubscribers(self, value):
        for (f,exceptionHdlr) in self._callbacks():
            try:
                f(value)
            except Exception as ex:
                if exceptionHdlr and not exceptionHdlr(ex):
                    raise            # reraise if not handled

    def setvalu(self, value):
        """Notify the subcribers only when the value changes."""
        if self._value != value:
            if self._changeLock.acquire(0):     # non-blocking
                self._value = value
                try:
                    self._notifySubscribers(value)
                finally:
                    self._changeLock.release()
            else:
                raise RecursionError("Attempted recursion into observable's set method.")

    def subscribe(self, obsv, exceptionInfo = None):
        observer = obsv.setvalu if isinstance(obsv, _observed) else obsv
        ob_info =(observer, exceptionInfo)
        self.subscribers.append(ob_info)
        return _subscription(ob_info, self)

    def _callbacks(self):
       scribers = []
       scribers.extend(self.subscribers)
       return scribers

    def _cancel(self, wref):
        self.subscribers.remove(wref)


class Observable:
    """An observable implemented as a descriptor. Subscribe to an observable 
    via calling  xxx.observable.subscribe(callback)"""
    def __init__(self, nam):
        self.xname = "__"+nam
        self.observed = _observed

    def __set__(self,inst, value ):
        """set gets the instances associated variable and calls 
        its setvalu method, which notifies subribers"""
        if inst and not hasattr(inst, self.xname):
            setattr(inst, self.xname, self.observed())
        ow = getattr(inst, self.xname)
        ow.setvalu(value)

    def __get__(self, inst, klass):
        """get gets the instances associated variable returns it"""
        if inst and not hasattr(inst, self.xname):
            setattr(inst, self.xname, self.observed())
        return getattr(inst, self.xname)

#-----------------------------------------------------------------------
#   Example & Simple Test
#-----------------------------------------------------------------------
if __name__ == '__main__':
    import unittest
    
    class MyClass:
        """ A Class containing the observables length and width"""
        length = Observable('length')   #argument string should match the
                                        #variable name
        width = Observable('width')

        def __init__(self):
            self.length.setvalu(0)
            self.width.setvalu(0)
            

    class MyObserver:
        """An observer class. The initializer is passed an instance
           of 'myClass' and subscribes to length and width changes.
           This observer also itself contains an observable, l2"""
        
        l2 = Observable('l2')

        def __init__(self, name, observedObj):
            self.name = name
            self.subscrptn1 = observedObj.length.subscribe(self.print_length)
            self.subscrptn2 = observedObj.width.subscribe(self.print_width)
            
            """An observable can subscribe to an observable, in which case
              a change will chain through both subscription lists.
              Here l2's subscribers will be notified whenever observedObj.length
              changes"""
            self.subscrptn3 = observedObj.length.subscribe(self.l2)
            self.w = self.l = None

        def print_width(self, value):
            print ("%s Observed Width"%self.name, value)
            self.w = value

        def print_length(self, value):
            print ("%s Observed Length"%self.name, value)
            self.l = value
            
        def cancel(self):
            """Cancels the instances current subscriptions. Setting self.subscrptn1
            to None removes the reference to the subscription object, causing it's 
            finalizer (__del__) method to be called."""
            self.subscrptn1 = None
            self.subscrptn2 = None
            self.subscrptn3 = None

    def PrintLen2(value):
        print ("PrintLen2 reports ", value)
        if type(value) == type(3):
            raise ValueError("PrintLen2 doesn't want ints.")

    def handlePl2Exceptions( ex ):
        print ('Handling pl2 exception:', ex, type(ex))
        return ( type(ex)== ValueError )
       

    class ObserverTestCases(unittest.TestCase):
      area = MyClass()
      kent = MyObserver("Kent", area)
      billy = MyObserver("Billy", area)

      # here we set up a chained observer. PrintLen2 function is called when
      # billy.l2 changes. handlePl2Exceptions is exception handler for PrintLen2.
      # if PrintLen2 throws an Exception, handlePl2Exceptions will be called to
      # handle them.
    
      subscription = billy.l2.subscribe(PrintLen2, handlePl2Exceptions)

      def test_01(self):
        self.area.length = 6
        self.assertEqual(self.kent.l, 6)
        self.assertEqual(self.billy.l, 6)
        
        self.area.width = 4
        self.assertEqual(self.kent.w, 4)
        self.assertEqual(self.billy.w, 4)
        
        self.area.length = "Reddish"
        self.assertEqual(self.kent.l, "Reddish")
        self.assertEqual(self.billy.l, "Reddish")

      def test_02(self):
        self.billy.subscrptn1 = None
        print ("Billy shouldn't report a length change to 5.15.")
        self.area.length = 5.15
        self.assertEqual(self.kent.l, 5.15)
        self.assertEqual(self.billy.l, "Reddish")
        
      def test_03(self):
        self.billy.cancel()
        print ("Billy should no longer report")
        self.area.length = 7
        self.assertEqual(self.kent.l, 7)
        self.assertEqual(self.billy.l, "Reddish")
        self.area.width = 3
        self.assertEqual(self.kent.w, 3)
        self.assertEqual(self.billy.w, 4) # existing value

        self.assertEqual(self.area.length(), 7)
        self.assertEqual(self.area.width(), 3)

      def test_04(self):
        print ("Deleting an object with observables having subscribers is ok")
        self.area = None
        self.area = MyClass()
        print ("replaced area - no subscribers to this new instance")
        self.area.length = 5
        self.assertEqual(self.kent.l,7)
        self.assertEqual(self.billy.l, "Reddish")
        self.area.width ="Bluish"
        self.assertEqual(self.kent.w, 3)
        self.assertEqual(self.billy.w, 4)


    c = input("ok? ")
    unittest.main()

In the example, MyClass contains two observables, length and width. An observer class is defined. The second argument to the initializer is an instance of MyClass. The initializer subscribes to changes in myClass's length and width values.

The observer class itself contains an observable, l2. And the observer's init method sets the l2 observable to be a subscriber to the length attribute of the myClass instance.

In the example two instances of the observer class are created and subscribe to changes of the length and width of the area object.

  • ??-Dec-09 Modified to use weak reference in subscriber list.
  • 11-Mar-10 Fix problems introduced with using weakref and simplify subscription cancellation. Improve variable names. Simplify examples.
  • 12-Mar-10 Fix for when subscribers unsubsribe themselves during a notification and for when object containing observables is deleted while having subscribers
  • 13-Mar-10 Improve exception handling, changes for use with threadsafe version, docstring improvements