Welcome, guest | Sign In | My Account | Store | Cart
# Synchronization classes using decorators. Provides synchronized, semaphore
# and event classes which provide transparent decorator patterns for
# Lock, BoundedSemaphore and Event objects in Python.

from threading import Thread, Lock, BoundedSemaphore, Event, currentThread
from time import sleep
from random import random
    
class synchronized(object):
    """ Class enapsulating a lock and a function
    allowing it to be used as a synchronizing
    decorator making the wrapped function
    thread-safe """
    
    def __init__(self, *args):
        self.lock = Lock()
        
    def __call__(self, f):
        def lockedfunc(*args, **kwargs):
            try:
                self.lock.acquire()
                print 'Acquired lock=>',currentThread()
                try:
                    return f(*args, **kwargs)
                except Exception, e:
                    raise
            finally:
                self.lock.release()
                print 'Released lock=>',currentThread()

        return lockedfunc


class semaphore(object):
    """ Class encapsulating a semaphore to limit
    number of resources  """

    def __init__(self, *args):
        self.sem = BoundedSemaphore(args[0])
    
    def __call__(self, f):
        def semfunc(*args, **kwargs):
            try:
                print 'Trying to acquire sem=>',currentThread()
                self.sem.acquire()
                print 'Acquired sem=>',currentThread()
                try:
                    return f(*args, **kwargs)
                except Exception, e:
                    raise
            finally:
                self.sem.release()
                print 'Released sem=>',currentThread()

        
        return semfunc

class event(object):
    """ Class encapsulating an event object to control
    sequential access to a resource """

    def __init__(self, *args):
        self.evt = Event()
        self.evt.set()
    
    def __call__(self, f):
        def eventfunc(*args, **kwargs):
            try:
                print 'Waiting on event =>',currentThread()
                self.evt.wait()
                # First thread will clear the event and
                # make others wait, once it is done with the
                # job, it sets the event which wakes up
                # another thread, which does the same thing...
                # This provides sequential access to a
                # resource...
                self.evt.clear()
                print 'Cleared event =>',currentThread()
                try:
                    return f(*args, **kwargs)
                except Exception, e:
                    raise
            finally:
                # Wake up another thread...
                self.evt.set()
                print 'Set event=>',currentThread()

        return eventfunc

##############################################################################
# Test Code                                                                  #
##############################################################################
# Demonstrating the synchronization classes...
# Use a global list

l=range(10)

def reset():
    global l
    l = range(10)

# Not thread-safe        
def func1(begin, end):
    for x in range(begin, end):
        sleep(random()*0.5)
        l.append(x)

# Thread-safe!
@synchronized()
def func2(begin, end):
    for x in range(begin, end):
        sleep(random()*0.5)        
        l.append(x)


# Limited access, thread-safe
class DBConnection(object):
    """ A dummy db connection class """

    MAX = 5
    # We want to limit the number of DB connections to MAX
    # at a given time
    @semaphore(MAX)
    def connect(self, host):
        print "Connecting...",currentThread()
        # Sleep for some time
        sleep(3.0)
        pass

    # We want sequential access to this function
    @event()
    def connect2(self, host):
        print "Connecting...",currentThread()
        # Sleep for some time
        sleep(3.0)
        pass    
    

class PrintMsg(object):
    def startmsg(self):
        print '%s started...' % self.__class__.__name__
    def endmsg(self):
        print '%s ended...' % self.__class__.__name__        

class BaseThread(Thread, PrintMsg):
    pass

class MyThread1(BaseThread):
    def run(self):
        self.startmsg()
        func1(10, 20)
        self.endmsg()        

class MyThread2(BaseThread):
    def run(self):
        self.startmsg()        
        func1(20, 30)
        self.endmsg()

class MyThread3(BaseThread):
    def run(self):
        self.startmsg()
        func2(10, 20)
        self.endmsg()        

class MyThread4(BaseThread):
    def run(self):
        self.startmsg()        
        func2(20, 30)
        self.endmsg()                

class DBThread(BaseThread):
    def run(self):
        db = DBConnection()
        db.connect('localhost')


class DBThread2(BaseThread):
    def run(self):
        db = DBConnection()
        db.connect2('localhost')


print 'Starting the lock test...'

t1 = MyThread1()
t2 = MyThread2()

t1.start(); t2.start()
t1.join(); t2.join()

# List will not have elements in order
print l

reset()

t3 = MyThread3()
t4 = MyThread4()

t3.start(); t4.start()
t3.join(); t4.join()

# List will have elements in order
print l


sleep(3.0)

print 'Starting the sem test...'
# Sem test, init 8 threads and call connect
# on the DBConnection object...
for x in range(8):
   t = DBThread()
   t.start()#

sleep(3.0)

print 'Starting event test..'

# Event test, init 8 threads and 
# increment counter
for x in range(8):
    t = DBThread2()
    t.start()

print 'All tests completed.'
###############################################################################
#  End of test code                                                           #
###############################################################################

History

  • revision 2 (16 years ago)
  • previous revisions are not available