# Synchronization classes using decorators. Provides synchronized, semaphore # and event classes which provide transparent decorator patterns for # Lock, BoundedSemaphore and Event objects in Python. from threading import Thread, Lock, BoundedSemaphore, Event, currentThread from time import sleep from random import random class synchronized(object): """ Class enapsulating a lock and a function allowing it to be used as a synchronizing decorator making the wrapped function thread-safe """ def __init__(self, *args): self.lock = Lock() def __call__(self, f): def lockedfunc(*args, **kwargs): try: self.lock.acquire() print 'Acquired lock=>',currentThread() try: return f(*args, **kwargs) except Exception, e: raise finally: self.lock.release() print 'Released lock=>',currentThread() return lockedfunc class semaphore(object): """ Class encapsulating a semaphore to limit number of resources """ def __init__(self, *args): self.sem = BoundedSemaphore(args[0]) def __call__(self, f): def semfunc(*args, **kwargs): try: print 'Trying to acquire sem=>',currentThread() self.sem.acquire() print 'Acquired sem=>',currentThread() try: return f(*args, **kwargs) except Exception, e: raise finally: self.sem.release() print 'Released sem=>',currentThread() return semfunc class event(object): """ Class encapsulating an event object to control sequential access to a resource """ def __init__(self, *args): self.evt = Event() self.evt.set() def __call__(self, f): def eventfunc(*args, **kwargs): try: print 'Waiting on event =>',currentThread() self.evt.wait() # First thread will clear the event and # make others wait, once it is done with the # job, it sets the event which wakes up # another thread, which does the same thing... # This provides sequential access to a # resource... self.evt.clear() print 'Cleared event =>',currentThread() try: return f(*args, **kwargs) except Exception, e: raise finally: # Wake up another thread... self.evt.set() print 'Set event=>',currentThread() return eventfunc ############################################################################## # Test Code # ############################################################################## # Demonstrating the synchronization classes... # Use a global list l=range(10) def reset(): global l l = range(10) # Not thread-safe def func1(begin, end): for x in range(begin, end): sleep(random()*0.5) l.append(x) # Thread-safe! @synchronized() def func2(begin, end): for x in range(begin, end): sleep(random()*0.5) l.append(x) # Limited access, thread-safe class DBConnection(object): """ A dummy db connection class """ MAX = 5 # We want to limit the number of DB connections to MAX # at a given time @semaphore(MAX) def connect(self, host): print "Connecting...",currentThread() # Sleep for some time sleep(3.0) pass # We want sequential access to this function @event() def connect2(self, host): print "Connecting...",currentThread() # Sleep for some time sleep(3.0) pass class PrintMsg(object): def startmsg(self): print '%s started...' % self.__class__.__name__ def endmsg(self): print '%s ended...' % self.__class__.__name__ class BaseThread(Thread, PrintMsg): pass class MyThread1(BaseThread): def run(self): self.startmsg() func1(10, 20) self.endmsg() class MyThread2(BaseThread): def run(self): self.startmsg() func1(20, 30) self.endmsg() class MyThread3(BaseThread): def run(self): self.startmsg() func2(10, 20) self.endmsg() class MyThread4(BaseThread): def run(self): self.startmsg() func2(20, 30) self.endmsg() class DBThread(BaseThread): def run(self): db = DBConnection() db.connect('localhost') class DBThread2(BaseThread): def run(self): db = DBConnection() db.connect2('localhost') print 'Starting the lock test...' t1 = MyThread1() t2 = MyThread2() t1.start(); t2.start() t1.join(); t2.join() # List will not have elements in order print l reset() t3 = MyThread3() t4 = MyThread4() t3.start(); t4.start() t3.join(); t4.join() # List will have elements in order print l sleep(3.0) print 'Starting the sem test...' # Sem test, init 8 threads and call connect # on the DBConnection object... for x in range(8): t = DBThread() t.start()# sleep(3.0) print 'Starting event test..' # Event test, init 8 threads and # increment counter for x in range(8): t = DBThread2() t.start() print 'All tests completed.' ############################################################################### # End of test code # ###############################################################################