Welcome, guest | Sign In | My Account | Store | Cart
import weakref
import sys
import threading   # for lock

class RecursionError(RuntimeError):
    pass

class _subscription:
    """A subscription is returned as a result of subscribing to an 
       observable. When the subscription object is finalized, the 
       subscription is cancelled.  This class is used to facilitate 
       subscription cancellation."""

    def __init__(self, subscriber, observed):
        self.subscriber = subscriber
        self.observed = weakref.ref(observed)

    def __del__(self):
        obsrvd = self.observed()
        if (self.subscriber and obsrvd):
            obsrvd._cancel(self.subscriber)


class _observed:
    '''Half-hidden class.  Only 'observable' should construct these.
    Calls to subscribe, cancel get invoked through the observable.
    observed objects reside in class instances containing observables.'''

    def __init__(self):
        self.subscribers = []
        self._value = None
        self._changeLock = threading.Lock()
        
    def __call__(self):
        """returns the current value, the one last set"""
        return self._value

    def _notifySubscribers(self, value):
        for (f,exceptionHdlr) in self._callbacks():
            try:
                f(value)
            except Exception as ex:
                if exceptionHdlr and not exceptionHdlr(ex):
                    raise            # reraise if not handled

    def setvalu(self, value):
        """Notify the subcribers only when the value changes."""
        if self._value != value:
            if self._changeLock.acquire(0):     # non-blocking
                self._value = value
                try:
                    self._notifySubscribers(value)
                finally:
                    self._changeLock.release()
            else:
                raise RecursionError("Attempted recursion into observable's set method.")

    def subscribe(self, obsv, exceptionInfo = None):
        observer = obsv.setvalu if isinstance(obsv, _observed) else obsv
        ob_info =(observer, exceptionInfo)
        self.subscribers.append(ob_info)
        return _subscription(ob_info, self)

    def _callbacks(self):
       scribers = []
       scribers.extend(self.subscribers)
       return scribers

    def _cancel(self, wref):
        self.subscribers.remove(wref)


class Observable:
    """An observable implemented as a descriptor. Subscribe to an observable 
    via calling  xxx.observable.subscribe(callback)"""
    def __init__(self, nam):
        self.xname = "__"+nam
        self.observed = _observed

    def __set__(self,inst, value ):
        """set gets the instances associated variable and calls 
        its setvalu method, which notifies subribers"""
        if inst and not hasattr(inst, self.xname):
            setattr(inst, self.xname, self.observed())
        ow = getattr(inst, self.xname)
        ow.setvalu(value)

    def __get__(self, inst, klass):
        """get gets the instances associated variable returns it"""
        if inst and not hasattr(inst, self.xname):
            setattr(inst, self.xname, self.observed())
        return getattr(inst, self.xname)

#-----------------------------------------------------------------------
#   Example & Simple Test
#-----------------------------------------------------------------------
if __name__ == '__main__':
    import unittest
    
    class MyClass:
        """ A Class containing the observables length and width"""
        length = Observable('length')   #argument string should match the
                                        #variable name
        width = Observable('width')

        def __init__(self):
            self.length.setvalu(0)
            self.width.setvalu(0)
            

    class MyObserver:
        """An observer class. The initializer is passed an instance
           of 'myClass' and subscribes to length and width changes.
           This observer also itself contains an observable, l2"""
        
        l2 = Observable('l2')

        def __init__(self, name, observedObj):
            self.name = name
            self.subscrptn1 = observedObj.length.subscribe(self.print_length)
            self.subscrptn2 = observedObj.width.subscribe(self.print_width)
            
            """An observable can subscribe to an observable, in which case
              a change will chain through both subscription lists.
              Here l2's subscribers will be notified whenever observedObj.length
              changes"""
            self.subscrptn3 = observedObj.length.subscribe(self.l2)
            self.w = self.l = None

        def print_width(self, value):
            print ("%s Observed Width"%self.name, value)
            self.w = value

        def print_length(self, value):
            print ("%s Observed Length"%self.name, value)
            self.l = value
            
        def cancel(self):
            """Cancels the instances current subscriptions. Setting self.subscrptn1
            to None removes the reference to the subscription object, causing it's 
            finalizer (__del__) method to be called."""
            self.subscrptn1 = None
            self.subscrptn2 = None
            self.subscrptn3 = None

    def PrintLen2(value):
        print ("PrintLen2 reports ", value)
        if type(value) == type(3):
            raise ValueError("PrintLen2 doesn't want ints.")

    def handlePl2Exceptions( ex ):
        print ('Handling pl2 exception:', ex, type(ex))
        return ( type(ex)== ValueError )
       

    class ObserverTestCases(unittest.TestCase):
      area = MyClass()
      kent = MyObserver("Kent", area)
      billy = MyObserver("Billy", area)

      # here we set up a chained observer. PrintLen2 function is called when
      # billy.l2 changes. handlePl2Exceptions is exception handler for PrintLen2.
      # if PrintLen2 throws an Exception, handlePl2Exceptions will be called to
      # handle them.
    
      subscription = billy.l2.subscribe(PrintLen2, handlePl2Exceptions)

      def test_01(self):
        self.area.length = 6
        self.assertEqual(self.kent.l, 6)
        self.assertEqual(self.billy.l, 6)
        
        self.area.width = 4
        self.assertEqual(self.kent.w, 4)
        self.assertEqual(self.billy.w, 4)
        
        self.area.length = "Reddish"
        self.assertEqual(self.kent.l, "Reddish")
        self.assertEqual(self.billy.l, "Reddish")

      def test_02(self):
        self.billy.subscrptn1 = None
        print ("Billy shouldn't report a length change to 5.15.")
        self.area.length = 5.15
        self.assertEqual(self.kent.l, 5.15)
        self.assertEqual(self.billy.l, "Reddish")
        
      def test_03(self):
        self.billy.cancel()
        print ("Billy should no longer report")
        self.area.length = 7
        self.assertEqual(self.kent.l, 7)
        self.assertEqual(self.billy.l, "Reddish")
        self.area.width = 3
        self.assertEqual(self.kent.w, 3)
        self.assertEqual(self.billy.w, 4) # existing value

        self.assertEqual(self.area.length(), 7)
        self.assertEqual(self.area.width(), 3)

      def test_04(self):
        print ("Deleting an object with observables having subscribers is ok")
        self.area = None
        self.area = MyClass()
        print ("replaced area - no subscribers to this new instance")
        self.area.length = 5
        self.assertEqual(self.kent.l,7)
        self.assertEqual(self.billy.l, "Reddish")
        self.area.width ="Bluish"
        self.assertEqual(self.kent.w, 3)
        self.assertEqual(self.billy.w, 4)


    c = input("ok? ")
    unittest.main()

History