Welcome, guest | Sign In | My Account | Store | Cart

This recipe provides decorator classes that can be used to transparently provide synchronization and access control to your resources.

Python, 229 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# Synchronization classes using decorators. Provides synchronized, semaphore
# and event classes which provide transparent decorator patterns for
# Lock, BoundedSemaphore and Event objects in Python.

from threading import Thread, Lock, BoundedSemaphore, Event, currentThread
from time import sleep
from random import random
    
class synchronized(object):
    """ Class enapsulating a lock and a function
    allowing it to be used as a synchronizing
    decorator making the wrapped function
    thread-safe """
    
    def __init__(self, *args):
        self.lock = Lock()
        
    def __call__(self, f):
        def lockedfunc(*args, **kwargs):
            try:
                self.lock.acquire()
                print 'Acquired lock=>',currentThread()
                try:
                    return f(*args, **kwargs)
                except Exception, e:
                    raise
            finally:
                self.lock.release()
                print 'Released lock=>',currentThread()

        return lockedfunc


class semaphore(object):
    """ Class encapsulating a semaphore to limit
    number of resources  """

    def __init__(self, *args):
        self.sem = BoundedSemaphore(args[0])
    
    def __call__(self, f):
        def semfunc(*args, **kwargs):
            try:
                print 'Trying to acquire sem=>',currentThread()
                self.sem.acquire()
                print 'Acquired sem=>',currentThread()
                try:
                    return f(*args, **kwargs)
                except Exception, e:
                    raise
            finally:
                self.sem.release()
                print 'Released sem=>',currentThread()

        
        return semfunc

class event(object):
    """ Class encapsulating an event object to control
    sequential access to a resource """

    def __init__(self, *args):
        self.evt = Event()
        self.evt.set()
    
    def __call__(self, f):
        def eventfunc(*args, **kwargs):
            try:
                print 'Waiting on event =>',currentThread()
                self.evt.wait()
                # First thread will clear the event and
                # make others wait, once it is done with the
                # job, it sets the event which wakes up
                # another thread, which does the same thing...
                # This provides sequential access to a
                # resource...
                self.evt.clear()
                print 'Cleared event =>',currentThread()
                try:
                    return f(*args, **kwargs)
                except Exception, e:
                    raise
            finally:
                # Wake up another thread...
                self.evt.set()
                print 'Set event=>',currentThread()

        return eventfunc

##############################################################################
# Test Code                                                                  #
##############################################################################
# Demonstrating the synchronization classes...
# Use a global list

l=range(10)

def reset():
    global l
    l = range(10)

# Not thread-safe        
def func1(begin, end):
    for x in range(begin, end):
        sleep(random()*0.5)
        l.append(x)

# Thread-safe!
@synchronized()
def func2(begin, end):
    for x in range(begin, end):
        sleep(random()*0.5)        
        l.append(x)


# Limited access, thread-safe
class DBConnection(object):
    """ A dummy db connection class """

    MAX = 5
    # We want to limit the number of DB connections to MAX
    # at a given time
    @semaphore(MAX)
    def connect(self, host):
        print "Connecting...",currentThread()
        # Sleep for some time
        sleep(3.0)
        pass

    # We want sequential access to this function
    @event()
    def connect2(self, host):
        print "Connecting...",currentThread()
        # Sleep for some time
        sleep(3.0)
        pass    
    

class PrintMsg(object):
    def startmsg(self):
        print '%s started...' % self.__class__.__name__
    def endmsg(self):
        print '%s ended...' % self.__class__.__name__        

class BaseThread(Thread, PrintMsg):
    pass

class MyThread1(BaseThread):
    def run(self):
        self.startmsg()
        func1(10, 20)
        self.endmsg()        

class MyThread2(BaseThread):
    def run(self):
        self.startmsg()        
        func1(20, 30)
        self.endmsg()

class MyThread3(BaseThread):
    def run(self):
        self.startmsg()
        func2(10, 20)
        self.endmsg()        

class MyThread4(BaseThread):
    def run(self):
        self.startmsg()        
        func2(20, 30)
        self.endmsg()                

class DBThread(BaseThread):
    def run(self):
        db = DBConnection()
        db.connect('localhost')


class DBThread2(BaseThread):
    def run(self):
        db = DBConnection()
        db.connect2('localhost')


print 'Starting the lock test...'

t1 = MyThread1()
t2 = MyThread2()

t1.start(); t2.start()
t1.join(); t2.join()

# List will not have elements in order
print l

reset()

t3 = MyThread3()
t4 = MyThread4()

t3.start(); t4.start()
t3.join(); t4.join()

# List will have elements in order
print l


sleep(3.0)

print 'Starting the sem test...'
# Sem test, init 8 threads and call connect
# on the DBConnection object...
for x in range(8):
   t = DBThread()
   t.start()#

sleep(3.0)

print 'Starting event test..'

# Event test, init 8 threads and 
# increment counter
for x in range(8):
    t = DBThread2()
    t.start()

print 'All tests completed.'
###############################################################################
#  End of test code                                                           #
###############################################################################

This recipe provides advanced synchronization classes using decorators. It can be used to seamlessly control access to your resources (classes, functions, objects) with very little code.

There is a similar recipe at http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/465057, which provides a basic synchronized(...) function as a decorator. This recipe uses a similar concept, but provides synchronization classes rather than functions by overriding the __call__ method. It also provides additional functionality for semaphores and events.

2 comments

Darren Redmond 16 years, 6 months ago  # | flag
Anand,

One thing I noticed when using this module is that the calls in each of the decorators need to return the result of the function.

So :

f(*args, **kwargs)

becomes :

return f(*args, **kwargs)

for each of the 3 decorators.

Now I was able to apply it to any function as opposed to functions with None response types.

cheers
Darren
Anand (author) 16 years, 6 months ago  # | flag

Good point. Good point... I missed it since I was not thinking about return values at all; was focusing to get the argument passing right. I have updated the recipe with this change.