Welcome, guest | Sign In | My Account | Store | Cart

This is a threadsafe version of recipe 576979. A publish-subscribe (observer) pattern is implemented as a descriptor. Assigning a value notifies the observers. Uses recipe 577105 as synchlock.py and recipe 576979 as Observer.py

Python, 100 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
from Observer import _obwan, Observable
import threading
from synchlock import synchronous

class _tsobwan(_obwan):
    """Subclassed _obwan to provide thread synchronization"""

    def __init__(self):
        _obwan.__init__(self)
        self._synchLock = threading.RLock()

    setvalu = synchronous('_synchLock')(_obwan.setvalu)
    subscribe = synchronous('_synchLock')(_obwan.subscribe)
    _callbacks = synchronous('_synchLock')(_obwan._callbacks)
    _cancel = synchronous('_synchLock')(_obwan._cancel)

class TSObservable(Observable):
    """Subclassed to provide thread synchronization"""
    
    def __init__(self, nam ):
        self.xname = "__"+nam
        self.obwan = _tsobwan

# -------------------------------------------------------------------------
# example case

if __name__ == "__main__":

    class MyClass(object):
        """ A Class containing the observables chancellor and width"""
        chancellor = TSObservable('chancellor')
        width = TSObservable('width')

        def __init__(self):
            self.chancellor.setvalu(0)
            self.width.setvalu(0)


    class MyObserver(object):
        """Provides Observers for above"""

        def __init__(self, name, observedObj):
            self.name = name
            self.subs1 = observedObj.chancellor.subscribe(self.print_c)
            self.subs2 = observedObj.width.subscribe(self.print_w)
            self.timesToReport = len(name)

        def print_c(self, value):
            print "%s observed change "%self.name, value
            self.timesToReport -= 1
            if self.timesToReport == 0:
                print "  cancelling my subscription"
                self.subs1 = None

        def print_w(self, value):
            print "%s observed value "%self.name, value

    
    def report2(value):
        print "Report 2 observed value of ",value

    import threading
    import time
    def onRun(observed, name):
        obsver = MyObserver(name, observed)
        isrunning = True
        for j in range(400):
            time.sleep(.10)
            if isrunning and obsver.timesToReport==0:
                isrunning = False
                observed.chancellor = obsver.name

    # ----------------------------------------------------
    obnames = """Bob AllisonJane Karl Mujiburami Becky Geraldine
       Johnny Barbarah Matthew""".split()

    area = MyClass()
    rptsbscrbr = area.width.subscribe(report2)

    thrds = [ threading.Thread(target=onRun, name=n, args=(area,n))
                for n in obnames]

    for thrd in thrds:
        thrd.start()

    # lots of reports on changes to width
    area.width = 4
    area.width = "reddish"
    # lots of reports on changes to chancellor
    area.chancellor = 1.5
    area.chancellor = 9
    print " "
    time.sleep(4.0)
    # Resursing starts as thread cancel subscriptions
    area.chancellor = "Amy" 
    # wait awhile...
    time.sleep(4)
    area.width =7.1

    c = raw_input("ok?")
      

TSObservable implements threadsafe observer pattern. Since a TSObservable is a descriptor, it is added at the class level. To subscribe to changes, invoke instance.observable.subscribe( observer, [exception handler]), which returns a subscription object. The subscription remains as long as there is a reference to the subscription object. The object's __del__ method causes the subscription to be cancelled.

The observer function takes one argument, which will be the value that the observable was set to. Exceptions produced by observer can be handled by the optionally provided exception handler. The exception handler will be called with the exception as the argument if observer throws an exception. The handler should return false if the exception was not handled or the handler wants the exception to be reraised.

If observer, or a method called by it attempts to change the observed value, a RecursionError is raised.

This recipe provides thread safety to the Observable in recipe 576979 by using the synchronous decorator in recipe 577105.

2 comments

John K Luebs 13 years, 9 months ago  # | flag

TSObservable isn't a descriptor. In python, a descriptor has a certain protocol involving a __get__ method at a minimum

Rodney Drenth (author) 13 years, 4 months ago  # | flag

The __get__ and __set__ methods are implemented in the parent class of TSObservable.