This recipe provides decorator classes that can be used to transparently provide synchronization and access control to your resources.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | # Synchronization classes using decorators. Provides synchronized, semaphore
# and event classes which provide transparent decorator patterns for
# Lock, BoundedSemaphore and Event objects in Python.
from threading import Thread, Lock, BoundedSemaphore, Event, currentThread
from time import sleep
from random import random
class synchronized(object):
""" Class enapsulating a lock and a function
allowing it to be used as a synchronizing
decorator making the wrapped function
thread-safe """
def __init__(self, *args):
self.lock = Lock()
def __call__(self, f):
def lockedfunc(*args, **kwargs):
try:
self.lock.acquire()
print 'Acquired lock=>',currentThread()
try:
return f(*args, **kwargs)
except Exception, e:
raise
finally:
self.lock.release()
print 'Released lock=>',currentThread()
return lockedfunc
class semaphore(object):
""" Class encapsulating a semaphore to limit
number of resources """
def __init__(self, *args):
self.sem = BoundedSemaphore(args[0])
def __call__(self, f):
def semfunc(*args, **kwargs):
try:
print 'Trying to acquire sem=>',currentThread()
self.sem.acquire()
print 'Acquired sem=>',currentThread()
try:
return f(*args, **kwargs)
except Exception, e:
raise
finally:
self.sem.release()
print 'Released sem=>',currentThread()
return semfunc
class event(object):
""" Class encapsulating an event object to control
sequential access to a resource """
def __init__(self, *args):
self.evt = Event()
self.evt.set()
def __call__(self, f):
def eventfunc(*args, **kwargs):
try:
print 'Waiting on event =>',currentThread()
self.evt.wait()
# First thread will clear the event and
# make others wait, once it is done with the
# job, it sets the event which wakes up
# another thread, which does the same thing...
# This provides sequential access to a
# resource...
self.evt.clear()
print 'Cleared event =>',currentThread()
try:
return f(*args, **kwargs)
except Exception, e:
raise
finally:
# Wake up another thread...
self.evt.set()
print 'Set event=>',currentThread()
return eventfunc
##############################################################################
# Test Code #
##############################################################################
# Demonstrating the synchronization classes...
# Use a global list
l=range(10)
def reset():
global l
l = range(10)
# Not thread-safe
def func1(begin, end):
for x in range(begin, end):
sleep(random()*0.5)
l.append(x)
# Thread-safe!
@synchronized()
def func2(begin, end):
for x in range(begin, end):
sleep(random()*0.5)
l.append(x)
# Limited access, thread-safe
class DBConnection(object):
""" A dummy db connection class """
MAX = 5
# We want to limit the number of DB connections to MAX
# at a given time
@semaphore(MAX)
def connect(self, host):
print "Connecting...",currentThread()
# Sleep for some time
sleep(3.0)
pass
# We want sequential access to this function
@event()
def connect2(self, host):
print "Connecting...",currentThread()
# Sleep for some time
sleep(3.0)
pass
class PrintMsg(object):
def startmsg(self):
print '%s started...' % self.__class__.__name__
def endmsg(self):
print '%s ended...' % self.__class__.__name__
class BaseThread(Thread, PrintMsg):
pass
class MyThread1(BaseThread):
def run(self):
self.startmsg()
func1(10, 20)
self.endmsg()
class MyThread2(BaseThread):
def run(self):
self.startmsg()
func1(20, 30)
self.endmsg()
class MyThread3(BaseThread):
def run(self):
self.startmsg()
func2(10, 20)
self.endmsg()
class MyThread4(BaseThread):
def run(self):
self.startmsg()
func2(20, 30)
self.endmsg()
class DBThread(BaseThread):
def run(self):
db = DBConnection()
db.connect('localhost')
class DBThread2(BaseThread):
def run(self):
db = DBConnection()
db.connect2('localhost')
print 'Starting the lock test...'
t1 = MyThread1()
t2 = MyThread2()
t1.start(); t2.start()
t1.join(); t2.join()
# List will not have elements in order
print l
reset()
t3 = MyThread3()
t4 = MyThread4()
t3.start(); t4.start()
t3.join(); t4.join()
# List will have elements in order
print l
sleep(3.0)
print 'Starting the sem test...'
# Sem test, init 8 threads and call connect
# on the DBConnection object...
for x in range(8):
t = DBThread()
t.start()#
sleep(3.0)
print 'Starting event test..'
# Event test, init 8 threads and
# increment counter
for x in range(8):
t = DBThread2()
t.start()
print 'All tests completed.'
###############################################################################
# End of test code #
###############################################################################
|
This recipe provides advanced synchronization classes using decorators. It can be used to seamlessly control access to your resources (classes, functions, objects) with very little code.
There is a similar recipe at http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/465057, which provides a basic synchronized(...) function as a decorator. This recipe uses a similar concept, but provides synchronization classes rather than functions by overriding the __call__ method. It also provides additional functionality for semaphores and events.
Good point. Good point... I missed it since I was not thinking about return values at all; was focusing to get the argument passing right. I have updated the recipe with this change.