######### mrow.py #########
"""Multiple-reader-one-writer resource locking."""
from thread import get_ident
import threading
def acquireLocked(fn):
def _acquireLocked_wrapper(self, *args, **kw):
L = self.acquireLock
try:
L.acquire()
result = fn(self, *args, **kw)
finally:
L.release()
return result
return _acquireLocked_wrapper
def releaseLocked(fn):
def _releaseLocked_wrapper(self, *args, **kw):
L = self.releaseLock
try:
L.acquire()
result = fn(self, *args, **kw)
finally:
L.release()
return result
return _releaseLocked_wrapper
class RWLock(object):
"""MROW resource lock."""
def __init__(self):
self.acquireLock = threading.Lock()
self.releaseLock = threading.Lock()
self.sublocks = []
self.waiting = []
self.readers = 0
self.writing = False
self.threadReaders = {}
self.threadWriters = {}
def reader(self):
"""Return an acquired read lock."""
threadReaders, threadWriters = self.threadReaders, self.threadWriters
ident = get_ident()
if ident in threadReaders:
sublock, count = threadReaders[ident]
threadReaders[ident] = (sublock, count + 1)
return sublock
elif ident in threadWriters:
# Writers are inherently readers, so treat as a reentrant
# write lock.
sublock, count = threadWriters[ident]
threadWriters[ident] = (sublock, count + 1)
return sublock
sublock = RLock(self)
if self.writing:
# Wait for acquired writers to release.
self.waiting.append(sublock)
sublock.acquire()
sublock.acquire()
self.readers += 1
self.sublocks.append(sublock)
threadReaders[ident] = (sublock, 1)
return sublock
reader = acquireLocked(reader)
def writer(self):
"""Return an acquired write lock."""
threadReaders, threadWriters = self.threadReaders, self.threadWriters
ident = get_ident()
wasReader = False
if ident in threadWriters:
sublock, count = threadWriters[ident]
threadWriters[ident] = (sublock, count + 1)
return sublock
elif ident in threadReaders:
# Readers-turned-writers must wait for any reads to complete
# before turning into writers.
sublock, count = threadReaders[ident]
del threadReaders[ident]
self.readers -= 1
self.sublocks.remove(sublock)
wasReader = True
sublock = WLock(self)
if self.readers or self.writing:
# Wait for acquired readers/writers to release.
self.waiting.append(sublock)
sublock.acquire()
sublock.acquire()
self.writing = True
self.sublocks.append(sublock)
if not wasReader:
count = 0
threadWriters[ident] = (sublock, count + 1)
return sublock
writer = acquireLocked(writer)
def _releaseR(self, sublock):
sublocks = self.sublocks
if sublock in sublocks:
threadReaders = self.threadReaders
ident = get_ident()
count = threadReaders[ident][1] - 1
if count:
threadReaders[ident] = (sublock, count)
else:
del threadReaders[ident]
self.readers -= 1
sublocks.remove(sublock)
waiting = self.waiting
if waiting and not self.readers:
# If a lock is waiting at this point, it is a write lock.
waiting.pop(0)._release()
_releaseR = releaseLocked(_releaseR)
def _releaseW(self, sublock):
sublocks = self.sublocks
if sublock in sublocks:
threadWriters = self.threadWriters
ident = get_ident()
count = threadWriters[ident][1] - 1
if count:
threadWriters[ident] = (sublock, count)
else:
del threadWriters[ident]
self.writing = False
sublocks.remove(sublock)
waiting = self.waiting
# Release any waiting read locks.
while waiting and isinstance(waiting[0], RLock):
waiting.pop(0)._release()
_releaseW = releaseLocked(_releaseW)
class SubLock(object):
def __init__(self, rwlock):
self.lock = threading.Lock()
self.rwlock = rwlock
def _release(self):
self.lock.release()
def acquire(self):
self.lock.acquire()
class RLock(SubLock):
def release(self):
self.rwlock._releaseR(self)
class WLock(SubLock):
def release(self):
self.rwlock._releaseW(self)
######### test_mrow.py #########
import threading
import time
from schevo.lib import mrow
def writer(L, value, after, rwlock, times):
"""Append value to L after a period of time."""
try:
lock = rwlock.writer()
# Get another lock, to test the fact that obtaining multiple
# write locks from the same thread context doesn't block (lock
# reentrancy).
lock2 = rwlock.writer()
# Get a reader lock too; should be the same as getting another
# writer since writers are inherently readers as well.
lock3 = rwlock.reader()
times.append(time.time())
time.sleep(after)
L.append(value)
finally:
times.append(time.time())
lock3.release()
lock2.release()
lock.release()
def reader(L1, L2, after, rwlock, times):
"""Append values from L1 to L2 after a period of time."""
try:
lock = rwlock.reader()
# Get another lock, to test the fact that obtaining multiple
# write locks from the same thread context doesn't block (lock
# reentrancy).
lock2 = rwlock.reader()
times.append(time.time())
time.sleep(after)
L2.extend(L1)
finally:
times.append(time.time())
lock2.release()
lock.release()
def readerTurnedWriter(L, value, after, rwlock, times):
"""Append value to L after a period of time."""
try:
lock = rwlock.reader()
lock2 = rwlock.writer()
times.append(time.time())
time.sleep(after)
L.append(value)
finally:
times.append(time.time())
lock2.release()
lock.release()
def test_reentrancy():
lock = mrow.RWLock()
# Reentrant read locks.
rlock1 = lock.reader()
rlock2 = lock.reader()
rlock2.release()
rlock1.release()
# Reentrant write locks.
wlock1 = lock.writer()
wlock2 = lock.writer()
wlock2.release()
wlock1.release()
# Writers are also readers.
wlock = lock.writer()
rlock = lock.reader()
rlock.release()
wlock.release()
def test_writeReadRead():
lock = mrow.RWLock()
W, R1, R2 = [], [], []
TW, TR1, TR2 = [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.2, lock, TW),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.2, lock, TR1),
)
thread3 = threading.Thread(
target=reader,
args=(W, R2, 0.2, lock, TR2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
thread3.start()
time.sleep(0.8)
assert 'foo' in R1
assert 'foo' in R2
assert TR1[0] <= TR2[1] # Read 1 started during read 2.
assert TR2[0] <= TR1[1] # Read 2 started during read 1.
assert TR1[0] >= TW[1] # Read 1 started after write.
assert TR2[0] >= TW[1] # Read 2 started after write.
def test_writeReadReadWrite():
lock = mrow.RWLock()
W, R1, R2 = [], [], []
TW1, TR1, TR2, TW2 = [], [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.3, lock, TW1),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.3, lock, TR1),
)
thread3 = threading.Thread(
target=reader,
args=(W, R2, 0.3, lock, TR2),
)
thread4 = threading.Thread(
target=writer,
args=(W, 'bar', 0.3, lock, TW2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
time.sleep(0.1)
thread3.start()
time.sleep(0.1)
thread4.start()
time.sleep(1.7)
assert 'foo' in R1
assert 'foo' in R2
assert 'bar' not in R1
assert 'bar' not in R2
assert 'bar' in W
assert TR1[0] <= TR2[1] # Read 1 started during read 2.
assert TR2[0] <= TR1[1] # Read 2 started during read 1.
assert TR1[0] >= TW1[1] # Read 1 started after write 1.
assert TR2[0] >= TW1[1] # Read 2 started after write 1.
assert TW2[0] >= TR1[1] # Write 2 started after read 1.
assert TW2[0] >= TR2[1] # Write 2 started after read 2.
def test_writeReadReadtowrite():
lock = mrow.RWLock()
W, R1 = [], []
TW1, TR1, TW2 = [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.3, lock, TW1),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.3, lock, TR1),
)
thread3 = threading.Thread(
target=readerTurnedWriter,
args=(W, 'bar', 0.3, lock, TW2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
time.sleep(0.1)
thread3.start()
time.sleep(1.7)
assert 'foo' in R1
assert 'bar' not in R1
assert 'bar' in W
assert TR1[0] >= TW1[1] # Read 1 started after write 1.
assert TW2[0] >= TR1[1] # Write 2 started after read 1.