In multithreaded apps, there is at times the need to control access to a resource to ensure data consistency and integrity. Multiple-reader, one-writer locking allows efficient read access by multiple threads, while ensuring that a write does not overlap any reads nor another write.
| ######### mrow.py #########
"""Multiple-reader-one-writer resource locking."""
from thread import get_ident
import threading
def acquireLocked(fn):
def _acquireLocked_wrapper(self, *args, **kw):
L = self.acquireLock
try:
L.acquire()
result = fn(self, *args, **kw)
finally:
L.release()
return result
return _acquireLocked_wrapper
def releaseLocked(fn):
def _releaseLocked_wrapper(self, *args, **kw):
L = self.releaseLock
try:
L.acquire()
result = fn(self, *args, **kw)
finally:
L.release()
return result
return _releaseLocked_wrapper
class RWLock(object):
"""MROW resource lock."""
def __init__(self):
self.acquireLock = threading.Lock()
self.releaseLock = threading.Lock()
self.sublocks = []
self.waiting = []
self.readers = 0
self.writing = False
self.threadReaders = {}
self.threadWriters = {}
def reader(self):
"""Return an acquired read lock."""
threadReaders, threadWriters = self.threadReaders, self.threadWriters
ident = get_ident()
if ident in threadReaders:
sublock, count = threadReaders[ident]
threadReaders[ident] = (sublock, count + 1)
return sublock
elif ident in threadWriters:
# Writers are inherently readers, so treat as a reentrant
# write lock.
sublock, count = threadWriters[ident]
threadWriters[ident] = (sublock, count + 1)
return sublock
sublock = RLock(self)
if self.writing:
# Wait for acquired writers to release.
self.waiting.append(sublock)
sublock.acquire()
sublock.acquire()
self.readers += 1
self.sublocks.append(sublock)
threadReaders[ident] = (sublock, 1)
return sublock
reader = acquireLocked(reader)
def writer(self):
"""Return an acquired write lock."""
threadReaders, threadWriters = self.threadReaders, self.threadWriters
ident = get_ident()
wasReader = False
if ident in threadWriters:
sublock, count = threadWriters[ident]
threadWriters[ident] = (sublock, count + 1)
return sublock
elif ident in threadReaders:
# Readers-turned-writers must wait for any reads to complete
# before turning into writers.
sublock, count = threadReaders[ident]
del threadReaders[ident]
self.readers -= 1
self.sublocks.remove(sublock)
wasReader = True
sublock = WLock(self)
if self.readers or self.writing:
# Wait for acquired readers/writers to release.
self.waiting.append(sublock)
sublock.acquire()
sublock.acquire()
self.writing = True
self.sublocks.append(sublock)
if not wasReader:
count = 0
threadWriters[ident] = (sublock, count + 1)
return sublock
writer = acquireLocked(writer)
def _releaseR(self, sublock):
sublocks = self.sublocks
if sublock in sublocks:
threadReaders = self.threadReaders
ident = get_ident()
count = threadReaders[ident][1] - 1
if count:
threadReaders[ident] = (sublock, count)
else:
del threadReaders[ident]
self.readers -= 1
sublocks.remove(sublock)
waiting = self.waiting
if waiting and not self.readers:
# If a lock is waiting at this point, it is a write lock.
waiting.pop(0)._release()
_releaseR = releaseLocked(_releaseR)
def _releaseW(self, sublock):
sublocks = self.sublocks
if sublock in sublocks:
threadWriters = self.threadWriters
ident = get_ident()
count = threadWriters[ident][1] - 1
if count:
threadWriters[ident] = (sublock, count)
else:
del threadWriters[ident]
self.writing = False
sublocks.remove(sublock)
waiting = self.waiting
# Release any waiting read locks.
while waiting and isinstance(waiting[0], RLock):
waiting.pop(0)._release()
_releaseW = releaseLocked(_releaseW)
class SubLock(object):
def __init__(self, rwlock):
self.lock = threading.Lock()
self.rwlock = rwlock
def _release(self):
self.lock.release()
def acquire(self):
self.lock.acquire()
class RLock(SubLock):
def release(self):
self.rwlock._releaseR(self)
class WLock(SubLock):
def release(self):
self.rwlock._releaseW(self)
######### test_mrow.py #########
import threading
import time
from schevo.lib import mrow
def writer(L, value, after, rwlock, times):
"""Append value to L after a period of time."""
try:
lock = rwlock.writer()
# Get another lock, to test the fact that obtaining multiple
# write locks from the same thread context doesn't block (lock
# reentrancy).
lock2 = rwlock.writer()
# Get a reader lock too; should be the same as getting another
# writer since writers are inherently readers as well.
lock3 = rwlock.reader()
times.append(time.time())
time.sleep(after)
L.append(value)
finally:
times.append(time.time())
lock3.release()
lock2.release()
lock.release()
def reader(L1, L2, after, rwlock, times):
"""Append values from L1 to L2 after a period of time."""
try:
lock = rwlock.reader()
# Get another lock, to test the fact that obtaining multiple
# write locks from the same thread context doesn't block (lock
# reentrancy).
lock2 = rwlock.reader()
times.append(time.time())
time.sleep(after)
L2.extend(L1)
finally:
times.append(time.time())
lock2.release()
lock.release()
def readerTurnedWriter(L, value, after, rwlock, times):
"""Append value to L after a period of time."""
try:
lock = rwlock.reader()
lock2 = rwlock.writer()
times.append(time.time())
time.sleep(after)
L.append(value)
finally:
times.append(time.time())
lock2.release()
lock.release()
def test_reentrancy():
lock = mrow.RWLock()
# Reentrant read locks.
rlock1 = lock.reader()
rlock2 = lock.reader()
rlock2.release()
rlock1.release()
# Reentrant write locks.
wlock1 = lock.writer()
wlock2 = lock.writer()
wlock2.release()
wlock1.release()
# Writers are also readers.
wlock = lock.writer()
rlock = lock.reader()
rlock.release()
wlock.release()
def test_writeReadRead():
lock = mrow.RWLock()
W, R1, R2 = [], [], []
TW, TR1, TR2 = [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.2, lock, TW),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.2, lock, TR1),
)
thread3 = threading.Thread(
target=reader,
args=(W, R2, 0.2, lock, TR2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
thread3.start()
time.sleep(0.8)
assert 'foo' in R1
assert 'foo' in R2
assert TR1[0] <= TR2[1] # Read 1 started during read 2.
assert TR2[0] <= TR1[1] # Read 2 started during read 1.
assert TR1[0] >= TW[1] # Read 1 started after write.
assert TR2[0] >= TW[1] # Read 2 started after write.
def test_writeReadReadWrite():
lock = mrow.RWLock()
W, R1, R2 = [], [], []
TW1, TR1, TR2, TW2 = [], [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.3, lock, TW1),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.3, lock, TR1),
)
thread3 = threading.Thread(
target=reader,
args=(W, R2, 0.3, lock, TR2),
)
thread4 = threading.Thread(
target=writer,
args=(W, 'bar', 0.3, lock, TW2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
time.sleep(0.1)
thread3.start()
time.sleep(0.1)
thread4.start()
time.sleep(1.7)
assert 'foo' in R1
assert 'foo' in R2
assert 'bar' not in R1
assert 'bar' not in R2
assert 'bar' in W
assert TR1[0] <= TR2[1] # Read 1 started during read 2.
assert TR2[0] <= TR1[1] # Read 2 started during read 1.
assert TR1[0] >= TW1[1] # Read 1 started after write 1.
assert TR2[0] >= TW1[1] # Read 2 started after write 1.
assert TW2[0] >= TR1[1] # Write 2 started after read 1.
assert TW2[0] >= TR2[1] # Write 2 started after read 2.
def test_writeReadReadtowrite():
lock = mrow.RWLock()
W, R1 = [], []
TW1, TR1, TW2 = [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.3, lock, TW1),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.3, lock, TR1),
)
thread3 = threading.Thread(
target=readerTurnedWriter,
args=(W, 'bar', 0.3, lock, TW2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
time.sleep(0.1)
thread3.start()
time.sleep(1.7)
assert 'foo' in R1
assert 'bar' not in R1
assert 'bar' in W
assert TR1[0] >= TW1[1] # Read 1 started after write 1.
assert TW2[0] >= TR1[1] # Write 2 started after read 1.
|
After implementing a single read-write database locking mechanism in Schevo, Dominic Fox suggested that Schevo use a multiple-reader-one-writer (MROW) locking mechanism for greater efficiency during reads.
We needed a mechanism that had the following qualities:
robust
reentrant locks
simple interface
unit tested
This implementation implements at least the last three of these qualities, and seems to be sufficiently robust.
If anyone has any suggestions or patches that help improve this recipe, I'd love to incorporate them.
Heh. Wrote one of these myself last week! See http://www.codepoetics.com/code/tripoli/synchronization.py.
Oh yeah, Like it says at the end there...
Incidentally, I will be looking at ways of writing unit tests for multi-threading components some time next week (I have a couple of tricks in mind, using the threadable function and AsyncResult class from http://www.codepoetics.com/code/tripoli/concurrency.py); MrowLockable will be the first candidate for testing.
reentrancy. Your work definitely inspired this recipe; I thank you for that :)
Consider adding reentrancy to locks acquired in the same thread; for instance, calling outer() in an instance of the class below will deadlock:
deadlock corner cases. I've updated the recipe to avoid some potential deadlock scenarios:
Threads that have acquired a write lock can subsequently acquire read locks, as writers are implicitly readers as well.
Threads that have acquired a read lock can subsequently acquire a write lock. The operation will block until existing read locks are released, then the read lock will morph into a write lock for the remainder of the lock acquisition's life span.
Caveat: The latter, while preventing a deadlock, might cause already-read data to become stale if a read lock morphs into a write lock after another write lock has been requested. I'll probably write a test case to illustrate this at some point.
Check out Doug Lea's books and web site. A lot of inspiration can be taken from Doug Lea's work. First of all, there is the Java util.concurrent package (which has been integrated into Java 5) :
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
This package is explained in detail in a very good book, Concurrent Programming in Java :
http://www.amazon.com/exec/obidos/ASIN/0201310090/
Lock-free structures. Just read an interesting article in C++ Journal recently, about lock-free structures, which are perfect for read-many-write-rarely and hence for MROW (as one writer cannot stumble onto itself - see code below, it's hopefully clear...).
(From memory) The basic idea is to use a pointer to a structure. Readers can read as much as they like, by reading the pointer, then going to the structure, without any locking. Writers make a copy of the structure, modify it, and then must atomically test-and-set the pointer to the structure.
This test-and-set operation was proved to be the most robust and efficient way to achieve lock-free resource access, and is implemented as one assembly instruction in many existing processors. In pure python, it would have to lock and would look something like this:
And used like this (assuming 'pointer.pointee' is a structure):
No time now to implement, please give it a go and tell me ;-)
The advantage in Python is that the discarded object will be garbage-collected. In non-GC C++, there's a lot of work to do!
It would be good to have this test_and_set as a native atomic operation!
Adding re-entrancy. I took a slightly different approach to re-entrancy in the end, adding a re-entrant wrapper to the non-reentrant lock class. This keeps the implementation of the basic lock class simple, and provides a fairly clean separation of concerns.
One other modification I made was to force any read request from a new thread to block if there were any write requests pending, so that readers could not starve writers of access.
The new code, together with some unit tests, is part of the "syncopated" module in Tripoli, which is maintained as a separate module and licensed under the Lesser GPL. See http://www.sourceforge.net/projects/tripoli for downloads.
There is a bug with this recipe. The exception i reproduced was "error, releasing unacquire lock" on a simple code the reloads a configuration file (writer lock) while other threads uses the readerlock to read. After replacing it with recipe 502283 without making any code change on dependencies (same module name), my problem was resolved.
I replace my codes with : http://code.activestate.com/recipes/502283/ instead, adding in
class RWLock (ReadWriteLock) :
Note that there is a proposal to add threading.RWLock to the standard library. It uses a different implementation.