Welcome, guest | Sign In | My Account | Store | Cart
import threading

__author__ = "Mateusz Kobos"

class RWLock:
	"""Synchronization object used in a solution of so-called second 
	readers-writers problem. In this problem, many readers can simultaneously 
	access a share, and a writer has an exclusive access to this share.
	Additionally, the following constraints should be met: 
	1) no reader should be kept waiting if the share is currently opened for 
		reading unless a writer is also waiting for the share, 
	2) no writer should be kept waiting for the share longer than absolutely 
		necessary. 
	
	The implementation is based on [1, secs. 4.2.2, 4.2.6, 4.2.7] 
	with a modification -- adding an additional lock (C{self.__readers_queue})
	-- in accordance with [2].
		
	Sources:
	[1] A.B. Downey: "The little book of semaphores", Version 2.1.5, 2008
	[2] P.J. Courtois, F. Heymans, D.L. Parnas:
		"Concurrent Control with 'Readers' and 'Writers'", 
		Communications of the ACM, 1971 (via [3])
	[3] http://en.wikipedia.org/wiki/Readers-writers_problem
	"""
	
	def __init__(self):
		self.__read_switch = _LightSwitch()
		self.__write_switch = _LightSwitch()
		self.__no_readers = threading.Lock()
		self.__no_writers = threading.Lock()
		self.__readers_queue = threading.Lock()
		"""A lock giving an even higher priority to the writer in certain
		cases (see [2] for a discussion)"""
	
	def reader_acquire(self):
		self.__readers_queue.acquire()
		self.__no_readers.acquire()
		self.__read_switch.acquire(self.__no_writers)
		self.__no_readers.release()
		self.__readers_queue.release()
	
	def reader_release(self):
		self.__read_switch.release(self.__no_writers)
	
	def writer_acquire(self):
		self.__write_switch.acquire(self.__no_readers)
		self.__no_writers.acquire()
	
	def writer_release(self):
		self.__no_writers.release()
		self.__write_switch.release(self.__no_readers)
	

class _LightSwitch:
	"""An auxiliary "light switch"-like object. The first thread turns on the 
	"switch", the last one turns it off (see [1, sec. 4.2.2] for details)."""
	def __init__(self):
		self.__counter = 0
		self.__mutex = threading.Lock()
	
	def acquire(self, lock):
		self.__mutex.acquire()
		self.__counter += 1
		if self.__counter == 1:
			lock.acquire()
		self.__mutex.release()

	def release(self, lock):
		self.__mutex.acquire()
		self.__counter -= 1
		if self.__counter == 0:
			lock.release()
		self.__mutex.release()

##
## Unit testing code
## =================
##

import unittest
import threading
import time
import copy

class Writer(threading.Thread):
	def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time, to_write):
		"""
		@param buffer_: common buffer_ shared by the readers and writers
		@type buffer_: list
		@type rw_lock: L{RWLock}
		@param init_sleep_time: sleep time before doing any action
		@type init_sleep_time: C{float}
		@param sleep_time: sleep time while in critical section
		@type sleep_time: C{float}
		@param to_write: data that will be appended to the buffer
		"""
		threading.Thread.__init__(self)
		self.__buffer = buffer_
		self.__rw_lock = rw_lock
		self.__init_sleep_time = init_sleep_time
		self.__sleep_time = sleep_time
		self.__to_write = to_write
		self.entry_time = None
		"""Time of entry to the critical section"""
		self.exit_time = None
		"""Time of exit from the critical section"""
		
	def run(self):
		time.sleep(self.__init_sleep_time)
		self.__rw_lock.writer_acquire()
		self.entry_time = time.time()
		time.sleep(self.__sleep_time)
		self.__buffer.append(self.__to_write)
		self.exit_time = time.time()
		self.__rw_lock.writer_release()

class Reader(threading.Thread):
	def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time):
		"""
		@param buffer_: common buffer shared by the readers and writers
		@type buffer_: list
		@type rw_lock: L{RWLock}
		@param init_sleep_time: sleep time before doing any action
		@type init_sleep_time: C{float}
		@param sleep_time: sleep time while in critical section
		@type sleep_time: C{float}
		"""
		threading.Thread.__init__(self)
		self.__buffer = buffer_
		self.__rw_lock = rw_lock
		self.__init_sleep_time = init_sleep_time
		self.__sleep_time = sleep_time
		self.buffer_read = None
		"""a copy of a the buffer read while in critical section"""	
		self.entry_time = None
		"""Time of entry to the critical section"""
		self.exit_time = None
		"""Time of exit from the critical section"""

	def run(self):
		time.sleep(self.__init_sleep_time)
		self.__rw_lock.reader_acquire()
		self.entry_time = time.time()
		time.sleep(self.__sleep_time)
		self.buffer_read = copy.deepcopy(self.__buffer)
		self.exit_time = time.time()
		self.__rw_lock.reader_release()

class RWLockTestCase(unittest.TestCase):
	def test_readers_nonexclusive_access(self):
		(buffer_, rw_lock, threads) = self.__init_variables()

		threads.append(Reader(buffer_, rw_lock, 0, 0))
		threads.append(Writer(buffer_, rw_lock, 0.2, 0.4, 1))
		threads.append(Reader(buffer_, rw_lock, 0.3, 0.3))
		threads.append(Reader(buffer_, rw_lock, 0.5, 0))
		
		self.__start_and_join_threads(threads)
		
		## The third reader should enter after the second one but it should
		## exit before the second one exits 
		## (i.e. the readers should be in the critical section 
		## at the same time)

		self.assertEqual([], threads[0].buffer_read)
		self.assertEqual([1], threads[2].buffer_read)
		self.assertEqual([1], threads[3].buffer_read)
		self.assert_(threads[1].exit_time <= threads[2].entry_time)
		self.assert_(threads[2].entry_time <= threads[3].entry_time)
		self.assert_(threads[3].exit_time < threads[2].exit_time)
	
	def test_writers_exclusive_access(self):
		(buffer_, rw_lock, threads) = self.__init_variables()

		threads.append(Writer(buffer_, rw_lock, 0, 0.4, 1))
		threads.append(Writer(buffer_, rw_lock, 0.1, 0, 2))
		threads.append(Reader(buffer_, rw_lock, 0.2, 0))
		
		self.__start_and_join_threads(threads)
		
		## The second writer should wait for the first one to exit

		self.assertEqual([1, 2], threads[2].buffer_read)
		self.assert_(threads[0].exit_time <= threads[1].entry_time)
		self.assert_(threads[1].exit_time <= threads[2].exit_time)
	
	def test_writer_priority(self):
		(buffer_, rw_lock, threads) = self.__init_variables()
		
		threads.append(Writer(buffer_, rw_lock, 0, 0, 1))
		threads.append(Reader(buffer_, rw_lock, 0.1, 0.4))
		threads.append(Writer(buffer_, rw_lock, 0.2, 0, 2))
		threads.append(Reader(buffer_, rw_lock, 0.3, 0))
		threads.append(Reader(buffer_, rw_lock, 0.3, 0))
		
		self.__start_and_join_threads(threads)
		
		## The second writer should go before the second and the third reader
		
		self.assertEqual([1], threads[1].buffer_read)
		self.assertEqual([1, 2], threads[3].buffer_read)
		self.assertEqual([1, 2], threads[4].buffer_read)		
		self.assert_(threads[0].exit_time < threads[1].entry_time)
		self.assert_(threads[1].exit_time <= threads[2].entry_time)
		self.assert_(threads[2].exit_time <= threads[3].entry_time)
		self.assert_(threads[2].exit_time <= threads[4].entry_time)
	
	def test_many_writers_priority(self):
		(buffer_, rw_lock, threads) = self.__init_variables()
		
		threads.append(Writer(buffer_, rw_lock, 0, 0, 1))
		threads.append(Reader(buffer_, rw_lock, 0.1, 0.6))
		threads.append(Writer(buffer_, rw_lock, 0.2, 0.1, 2))
		threads.append(Reader(buffer_, rw_lock, 0.3, 0))
		threads.append(Reader(buffer_, rw_lock, 0.4, 0))
		threads.append(Writer(buffer_, rw_lock, 0.5, 0.1, 3))
		
		self.__start_and_join_threads(threads)

		## The two last writers should go first -- after the first reader and
		## before the second and the third reader
		
		self.assertEqual([1], threads[1].buffer_read)
		self.assertEqual([1, 2, 3], threads[3].buffer_read)
		self.assertEqual([1, 2, 3], threads[4].buffer_read)		
		self.assert_(threads[0].exit_time < threads[1].entry_time)
		self.assert_(threads[1].exit_time <= threads[2].entry_time)
		self.assert_(threads[1].exit_time <= threads[5].entry_time)
		self.assert_(threads[2].exit_time <= threads[3].entry_time)
		self.assert_(threads[2].exit_time <= threads[4].entry_time)		
		self.assert_(threads[5].exit_time <= threads[3].entry_time)
		self.assert_(threads[5].exit_time <= threads[4].entry_time)	
	
	@staticmethod
	def __init_variables():
		buffer_ = []
		rw_lock = RWLock()
		threads = []
		return (buffer_, rw_lock, threads)

	@staticmethod
	def __start_and_join_threads(threads):
		for t in threads:
			t.start()
		for t in threads:
			t.join()

Diff to Previous Revision

--- revision 1 2011-07-22 18:49:20
+++ revision 2 2011-09-28 21:45:04
@@ -84,10 +84,10 @@
 import copy
 
 class Writer(threading.Thread):
-	def __init__(self, buffer, rw_lock, init_sleep_time, sleep_time, to_write):
-		"""
-		@param buffer: common buffer shared by the readers and writers
-		@type buffer: list
+	def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time, to_write):
+		"""
+		@param buffer_: common buffer_ shared by the readers and writers
+		@type buffer_: list
 		@type rw_lock: L{RWLock}
 		@param init_sleep_time: sleep time before doing any action
 		@type init_sleep_time: C{float}
@@ -96,7 +96,7 @@
 		@param to_write: data that will be appended to the buffer
 		"""
 		threading.Thread.__init__(self)
-		self.__buffer = buffer
+		self.__buffer = buffer_
 		self.__rw_lock = rw_lock
 		self.__init_sleep_time = init_sleep_time
 		self.__sleep_time = sleep_time
@@ -116,10 +116,10 @@
 		self.__rw_lock.writer_release()
 
 class Reader(threading.Thread):
-	def __init__(self, buffer, rw_lock, init_sleep_time, sleep_time):
-		"""
-		@param buffer: common buffer shared by the readers and writers
-		@type buffer: list
+	def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time):
+		"""
+		@param buffer_: common buffer shared by the readers and writers
+		@type buffer_: list
 		@type rw_lock: L{RWLock}
 		@param init_sleep_time: sleep time before doing any action
 		@type init_sleep_time: C{float}
@@ -127,7 +127,7 @@
 		@type sleep_time: C{float}
 		"""
 		threading.Thread.__init__(self)
-		self.__buffer = buffer
+		self.__buffer = buffer_
 		self.__rw_lock = rw_lock
 		self.__init_sleep_time = init_sleep_time
 		self.__sleep_time = sleep_time
@@ -166,8 +166,8 @@
 		self.assertEqual([], threads[0].buffer_read)
 		self.assertEqual([1], threads[2].buffer_read)
 		self.assertEqual([1], threads[3].buffer_read)
-		self.assert_(threads[1].exit_time < threads[2].entry_time)
-		self.assert_(threads[2].entry_time < threads[3].entry_time)
+		self.assert_(threads[1].exit_time <= threads[2].entry_time)
+		self.assert_(threads[2].entry_time <= threads[3].entry_time)
 		self.assert_(threads[3].exit_time < threads[2].exit_time)
 	
 	def test_writers_exclusive_access(self):
@@ -182,8 +182,8 @@
 		## The second writer should wait for the first one to exit
 
 		self.assertEqual([1, 2], threads[2].buffer_read)
-		self.assert_(threads[0].exit_time < threads[1].entry_time)
-		self.assert_(threads[1].exit_time < threads[2].exit_time)
+		self.assert_(threads[0].exit_time <= threads[1].entry_time)
+		self.assert_(threads[1].exit_time <= threads[2].exit_time)
 	
 	def test_writer_priority(self):
 		(buffer_, rw_lock, threads) = self.__init_variables()
@@ -202,9 +202,9 @@
 		self.assertEqual([1, 2], threads[3].buffer_read)
 		self.assertEqual([1, 2], threads[4].buffer_read)		
 		self.assert_(threads[0].exit_time < threads[1].entry_time)
-		self.assert_(threads[1].exit_time < threads[2].entry_time)
-		self.assert_(threads[2].exit_time < threads[3].entry_time)
-		self.assert_(threads[2].exit_time < threads[4].entry_time)
+		self.assert_(threads[1].exit_time <= threads[2].entry_time)
+		self.assert_(threads[2].exit_time <= threads[3].entry_time)
+		self.assert_(threads[2].exit_time <= threads[4].entry_time)
 	
 	def test_many_writers_priority(self):
 		(buffer_, rw_lock, threads) = self.__init_variables()
@@ -225,12 +225,12 @@
 		self.assertEqual([1, 2, 3], threads[3].buffer_read)
 		self.assertEqual([1, 2, 3], threads[4].buffer_read)		
 		self.assert_(threads[0].exit_time < threads[1].entry_time)
-		self.assert_(threads[1].exit_time < threads[2].entry_time)
-		self.assert_(threads[1].exit_time < threads[5].entry_time)
-		self.assert_(threads[2].exit_time < threads[3].entry_time)
-		self.assert_(threads[2].exit_time < threads[4].entry_time)		
-		self.assert_(threads[5].exit_time < threads[3].entry_time)
-		self.assert_(threads[5].exit_time < threads[4].entry_time)	
+		self.assert_(threads[1].exit_time <= threads[2].entry_time)
+		self.assert_(threads[1].exit_time <= threads[5].entry_time)
+		self.assert_(threads[2].exit_time <= threads[3].entry_time)
+		self.assert_(threads[2].exit_time <= threads[4].entry_time)		
+		self.assert_(threads[5].exit_time <= threads[3].entry_time)
+		self.assert_(threads[5].exit_time <= threads[4].entry_time)	
 	
 	@staticmethod
 	def __init_variables():

History