import weakref import sys import threading # for lock class RecursionError(RuntimeError): pass class _subscription: """A subscription is returned as a result of subscribing to an observable. When the subscription object is finalized, the subscription is cancelled. This class is used to facilitate subscription cancellation.""" def __init__(self, subscriber, observed): self.subscriber = subscriber self.observed = weakref.ref(observed) def __del__(self): obsrvd = self.observed() if (self.subscriber and obsrvd): obsrvd._cancel(self.subscriber) class _observed: '''Half-hidden class. Only 'observable' should construct these. Calls to subscribe, cancel get invoked through the observable. observed objects reside in class instances containing observables.''' def __init__(self): self.subscribers = [] self._value = None self._changeLock = threading.Lock() def __call__(self): """returns the current value, the one last set""" return self._value def _notifySubscribers(self, value): for (f,exceptionHdlr) in self._callbacks(): try: f(value) except Exception as ex: if exceptionHdlr and not exceptionHdlr(ex): raise # reraise if not handled def setvalu(self, value): """Notify the subcribers only when the value changes.""" if self._value != value: if self._changeLock.acquire(0): # non-blocking self._value = value try: self._notifySubscribers(value) finally: self._changeLock.release() else: raise RecursionError("Attempted recursion into observable's set method.") def subscribe(self, obsv, exceptionInfo = None): observer = obsv.setvalu if isinstance(obsv, _observed) else obsv ob_info =(observer, exceptionInfo) self.subscribers.append(ob_info) return _subscription(ob_info, self) def _callbacks(self): scribers = [] scribers.extend(self.subscribers) return scribers def _cancel(self, wref): self.subscribers.remove(wref) class Observable: """An observable implemented as a descriptor. Subscribe to an observable via calling xxx.observable.subscribe(callback)""" def __init__(self, nam): self.xname = "__"+nam self.observed = _observed def __set__(self,inst, value ): """set gets the instances associated variable and calls its setvalu method, which notifies subribers""" if inst and not hasattr(inst, self.xname): setattr(inst, self.xname, self.observed()) ow = getattr(inst, self.xname) ow.setvalu(value) def __get__(self, inst, klass): """get gets the instances associated variable returns it""" if inst and not hasattr(inst, self.xname): setattr(inst, self.xname, self.observed()) return getattr(inst, self.xname) #----------------------------------------------------------------------- # Example & Simple Test #----------------------------------------------------------------------- if __name__ == '__main__': import unittest class MyClass: """ A Class containing the observables length and width""" length = Observable('length') #argument string should match the #variable name width = Observable('width') def __init__(self): self.length.setvalu(0) self.width.setvalu(0) class MyObserver: """An observer class. The initializer is passed an instance of 'myClass' and subscribes to length and width changes. This observer also itself contains an observable, l2""" l2 = Observable('l2') def __init__(self, name, observedObj): self.name = name self.subscrptn1 = observedObj.length.subscribe(self.print_length) self.subscrptn2 = observedObj.width.subscribe(self.print_width) """An observable can subscribe to an observable, in which case a change will chain through both subscription lists. Here l2's subscribers will be notified whenever observedObj.length changes""" self.subscrptn3 = observedObj.length.subscribe(self.l2) self.w = self.l = None def print_width(self, value): print ("%s Observed Width"%self.name, value) self.w = value def print_length(self, value): print ("%s Observed Length"%self.name, value) self.l = value def cancel(self): """Cancels the instances current subscriptions. Setting self.subscrptn1 to None removes the reference to the subscription object, causing it's finalizer (__del__) method to be called.""" self.subscrptn1 = None self.subscrptn2 = None self.subscrptn3 = None def PrintLen2(value): print ("PrintLen2 reports ", value) if type(value) == type(3): raise ValueError("PrintLen2 doesn't want ints.") def handlePl2Exceptions( ex ): print ('Handling pl2 exception:', ex, type(ex)) return ( type(ex)== ValueError ) class ObserverTestCases(unittest.TestCase): area = MyClass() kent = MyObserver("Kent", area) billy = MyObserver("Billy", area) # here we set up a chained observer. PrintLen2 function is called when # billy.l2 changes. handlePl2Exceptions is exception handler for PrintLen2. # if PrintLen2 throws an Exception, handlePl2Exceptions will be called to # handle them. subscription = billy.l2.subscribe(PrintLen2, handlePl2Exceptions) def test_01(self): self.area.length = 6 self.assertEqual(self.kent.l, 6) self.assertEqual(self.billy.l, 6) self.area.width = 4 self.assertEqual(self.kent.w, 4) self.assertEqual(self.billy.w, 4) self.area.length = "Reddish" self.assertEqual(self.kent.l, "Reddish") self.assertEqual(self.billy.l, "Reddish") def test_02(self): self.billy.subscrptn1 = None print ("Billy shouldn't report a length change to 5.15.") self.area.length = 5.15 self.assertEqual(self.kent.l, 5.15) self.assertEqual(self.billy.l, "Reddish") def test_03(self): self.billy.cancel() print ("Billy should no longer report") self.area.length = 7 self.assertEqual(self.kent.l, 7) self.assertEqual(self.billy.l, "Reddish") self.area.width = 3 self.assertEqual(self.kent.w, 3) self.assertEqual(self.billy.w, 4) # existing value self.assertEqual(self.area.length(), 7) self.assertEqual(self.area.width(), 3) def test_04(self): print ("Deleting an object with observables having subscribers is ok") self.area = None self.area = MyClass() print ("replaced area - no subscribers to this new instance") self.area.length = 5 self.assertEqual(self.kent.l,7) self.assertEqual(self.billy.l, "Reddish") self.area.width ="Bluish" self.assertEqual(self.kent.w, 3) self.assertEqual(self.billy.w, 4) c = input("ok? ") unittest.main()