Welcome, guest | Sign In | My Account | Store | Cart
# Synchronization classes using decorators. Provides synchronized, semaphore
# and event classes which provide transparent decorator patterns for
# Lock, BoundedSemaphore and Event objects in Python.

from threading import Thread, Lock, BoundedSemaphore, Event, currentThread
from time import sleep
from random import random
   
class synchronized(object):
   
""" Class enapsulating a lock and a function
    allowing it to be used as a synchronizing
    decorator making the wrapped function
    thread-safe """

   
   
def __init__(self, *args):
       
self.lock = Lock()
       
   
def __call__(self, f):
       
def lockedfunc(*args, **kwargs):
           
try:
               
self.lock.acquire()
               
print 'Acquired lock=>',currentThread()
               
try:
                   
return f(*args, **kwargs)
               
except Exception, e:
                   
raise
           
finally:
               
self.lock.release()
               
print 'Released lock=>',currentThread()

       
return lockedfunc


class semaphore(object):
   
""" Class encapsulating a semaphore to limit
    number of resources  """


   
def __init__(self, *args):
       
self.sem = BoundedSemaphore(args[0])
   
   
def __call__(self, f):
       
def semfunc(*args, **kwargs):
           
try:
               
print 'Trying to acquire sem=>',currentThread()
               
self.sem.acquire()
               
print 'Acquired sem=>',currentThread()
               
try:
                   
return f(*args, **kwargs)
               
except Exception, e:
                   
raise
           
finally:
               
self.sem.release()
               
print 'Released sem=>',currentThread()

       
       
return semfunc

class event(object):
   
""" Class encapsulating an event object to control
    sequential access to a resource """


   
def __init__(self, *args):
       
self.evt = Event()
       
self.evt.set()
   
   
def __call__(self, f):
       
def eventfunc(*args, **kwargs):
           
try:
               
print 'Waiting on event =>',currentThread()
               
self.evt.wait()
               
# First thread will clear the event and
               
# make others wait, once it is done with the
               
# job, it sets the event which wakes up
               
# another thread, which does the same thing...
               
# This provides sequential access to a
               
# resource...
               
self.evt.clear()
               
print 'Cleared event =>',currentThread()
               
try:
                   
return f(*args, **kwargs)
               
except Exception, e:
                   
raise
           
finally:
               
# Wake up another thread...
               
self.evt.set()
               
print 'Set event=>',currentThread()

       
return eventfunc

##############################################################################
# Test Code                                                                  #
##############################################################################
# Demonstrating the synchronization classes...
# Use a global list

l
=range(10)

def reset():
   
global l
    l
= range(10)

# Not thread-safe        
def func1(begin, end):
   
for x in range(begin, end):
        sleep
(random()*0.5)
        l
.append(x)

# Thread-safe!
@synchronized()
def func2(begin, end):
   
for x in range(begin, end):
        sleep
(random()*0.5)        
        l
.append(x)


# Limited access, thread-safe
class DBConnection(object):
   
""" A dummy db connection class """

    MAX
= 5
   
# We want to limit the number of DB connections to MAX
   
# at a given time
   
@semaphore(MAX)
   
def connect(self, host):
       
print "Connecting...",currentThread()
       
# Sleep for some time
        sleep
(3.0)
       
pass

   
# We want sequential access to this function
   
@event()
   
def connect2(self, host):
       
print "Connecting...",currentThread()
       
# Sleep for some time
        sleep
(3.0)
       
pass    
   

class PrintMsg(object):
   
def startmsg(self):
       
print '%s started...' % self.__class__.__name__
   
def endmsg(self):
       
print '%s ended...' % self.__class__.__name__        

class BaseThread(Thread, PrintMsg):
   
pass

class MyThread1(BaseThread):
   
def run(self):
       
self.startmsg()
        func1
(10, 20)
       
self.endmsg()        

class MyThread2(BaseThread):
   
def run(self):
       
self.startmsg()        
        func1
(20, 30)
       
self.endmsg()

class MyThread3(BaseThread):
   
def run(self):
       
self.startmsg()
        func2
(10, 20)
       
self.endmsg()        

class MyThread4(BaseThread):
   
def run(self):
       
self.startmsg()        
        func2
(20, 30)
       
self.endmsg()                

class DBThread(BaseThread):
   
def run(self):
        db
= DBConnection()
        db
.connect('localhost')


class DBThread2(BaseThread):
   
def run(self):
        db
= DBConnection()
        db
.connect2('localhost')


print 'Starting the lock test...'

t1
= MyThread1()
t2
= MyThread2()

t1
.start(); t2.start()
t1
.join(); t2.join()

# List will not have elements in order
print l

reset
()

t3
= MyThread3()
t4
= MyThread4()

t3
.start(); t4.start()
t3
.join(); t4.join()

# List will have elements in order
print l


sleep
(3.0)

print 'Starting the sem test...'
# Sem test, init 8 threads and call connect
# on the DBConnection object...
for x in range(8):
   t
= DBThread()
   t
.start()#

sleep
(3.0)

print 'Starting event test..'

# Event test, init 8 threads and
# increment counter
for x in range(8):
    t
= DBThread2()
    t
.start()

print 'All tests completed.'
###############################################################################
#  End of test code                                                           #
###############################################################################

History

  • revision 2 (16 years ago)
  • previous revisions are not available