Welcome, guest | Sign In | My Account | Store | Cart
######### mrow.py #########

"""Multiple-reader-one-writer resource locking."""

from thread import get_ident
import threading


def acquireLocked(fn):
   
def _acquireLocked_wrapper(self, *args, **kw):
        L
= self.acquireLock
       
try:
            L
.acquire()
            result
= fn(self, *args, **kw)
       
finally:
            L
.release()
       
return result
   
return _acquireLocked_wrapper


def releaseLocked(fn):
   
def _releaseLocked_wrapper(self, *args, **kw):
        L
= self.releaseLock
       
try:
            L
.acquire()
            result
= fn(self, *args, **kw)
       
finally:
            L
.release()
       
return result
   
return _releaseLocked_wrapper


class RWLock(object):
   
"""MROW resource lock."""

   
def __init__(self):
       
self.acquireLock = threading.Lock()
       
self.releaseLock = threading.Lock()
       
self.sublocks = []
       
self.waiting = []
       
self.readers = 0
       
self.writing = False
       
self.threadReaders = {}
       
self.threadWriters = {}

   
def reader(self):
       
"""Return an acquired read lock."""
        threadReaders
, threadWriters = self.threadReaders, self.threadWriters
        ident
= get_ident()
       
if ident in threadReaders:
            sublock
, count = threadReaders[ident]
            threadReaders
[ident] = (sublock, count + 1)
           
return sublock
       
elif ident in threadWriters:
           
# Writers are inherently readers, so treat as a reentrant
           
# write lock.
            sublock
, count = threadWriters[ident]
            threadWriters
[ident] = (sublock, count + 1)
           
return sublock
        sublock
= RLock(self)
       
if self.writing:
           
# Wait for acquired writers to release.
           
self.waiting.append(sublock)
            sublock
.acquire()
        sublock
.acquire()
       
self.readers += 1
       
self.sublocks.append(sublock)
        threadReaders
[ident] = (sublock, 1)
       
return sublock
    reader
= acquireLocked(reader)

   
def writer(self):
       
"""Return an acquired write lock."""
        threadReaders
, threadWriters = self.threadReaders, self.threadWriters
        ident
= get_ident()
        wasReader
= False
       
if ident in threadWriters:
            sublock
, count = threadWriters[ident]
            threadWriters
[ident] = (sublock, count + 1)
           
return sublock
       
elif ident in threadReaders:
           
# Readers-turned-writers must wait for any reads to complete
           
# before turning into writers.
            sublock
, count = threadReaders[ident]
           
del threadReaders[ident]
           
self.readers -= 1
           
self.sublocks.remove(sublock)
            wasReader
= True
        sublock
= WLock(self)
       
if self.readers or self.writing:
           
# Wait for acquired readers/writers to release.
           
self.waiting.append(sublock)
            sublock
.acquire()
        sublock
.acquire()
       
self.writing = True
       
self.sublocks.append(sublock)
       
if not wasReader:
            count
= 0
        threadWriters
[ident] = (sublock, count + 1)
       
return sublock
    writer
= acquireLocked(writer)

   
def _releaseR(self, sublock):
        sublocks
= self.sublocks
       
if sublock in sublocks:
            threadReaders
= self.threadReaders
            ident
= get_ident()
            count
= threadReaders[ident][1] - 1
           
if count:
                threadReaders
[ident] = (sublock, count)
           
else:
               
del threadReaders[ident]
               
self.readers -= 1
                sublocks
.remove(sublock)
                waiting
= self.waiting
               
if waiting and not self.readers:
                   
# If a lock is waiting at this point, it is a write lock.
                    waiting
.pop(0)._release()
    _releaseR
= releaseLocked(_releaseR)

   
def _releaseW(self, sublock):
        sublocks
= self.sublocks
       
if sublock in sublocks:
            threadWriters
= self.threadWriters
            ident
= get_ident()
            count
= threadWriters[ident][1] - 1
           
if count:
                threadWriters
[ident] = (sublock, count)
           
else:
               
del threadWriters[ident]
               
self.writing = False
                sublocks
.remove(sublock)
                waiting
= self.waiting
               
# Release any waiting read locks.
               
while waiting and isinstance(waiting[0], RLock):
                    waiting
.pop(0)._release()
    _releaseW
= releaseLocked(_releaseW)


class SubLock(object):

   
def __init__(self, rwlock):
       
self.lock = threading.Lock()
       
self.rwlock = rwlock

   
def _release(self):
       
self.lock.release()

   
def acquire(self):
       
self.lock.acquire()


class RLock(SubLock):

   
def release(self):
       
self.rwlock._releaseR(self)


class WLock(SubLock):

   
def release(self):
       
self.rwlock._releaseW(self)



######### test_mrow.py #########


import threading
import time

from schevo.lib import mrow


def writer(L, value, after, rwlock, times):
   
"""Append value to L after a period of time."""
   
try:
       
lock = rwlock.writer()
       
# Get another lock, to test the fact that obtaining multiple
       
# write locks from the same thread context doesn't block (lock
       
# reentrancy).
        lock2
= rwlock.writer()
       
# Get a reader lock too; should be the same as getting another
       
# writer since writers are inherently readers as well.
        lock3
= rwlock.reader()
        times
.append(time.time())
        time
.sleep(after)
        L
.append(value)
   
finally:
        times
.append(time.time())
        lock3
.release()
        lock2
.release()
       
lock.release()


def reader(L1, L2, after, rwlock, times):
   
"""Append values from L1 to L2 after a period of time."""
   
try:
       
lock = rwlock.reader()
       
# Get another lock, to test the fact that obtaining multiple
       
# write locks from the same thread context doesn't block (lock
       
# reentrancy).
        lock2
= rwlock.reader()
        times
.append(time.time())
        time
.sleep(after)
        L2
.extend(L1)
   
finally:
        times
.append(time.time())
        lock2
.release()
       
lock.release()


def readerTurnedWriter(L, value, after, rwlock, times):
   
"""Append value to L after a period of time."""
   
try:
       
lock = rwlock.reader()
        lock2
= rwlock.writer()
        times
.append(time.time())
        time
.sleep(after)
        L
.append(value)
   
finally:
        times
.append(time.time())
        lock2
.release()
       
lock.release()


def test_reentrancy():
   
lock = mrow.RWLock()
   
# Reentrant read locks.
    rlock1
= lock.reader()
    rlock2
= lock.reader()
    rlock2
.release()
    rlock1
.release()
   
# Reentrant write locks.
    wlock1
= lock.writer()
    wlock2
= lock.writer()
    wlock2
.release()
    wlock1
.release()
   
# Writers are also readers.
    wlock
= lock.writer()
    rlock
= lock.reader()
    rlock
.release()
    wlock
.release()


def test_writeReadRead():
   
lock = mrow.RWLock()
    W
, R1, R2 = [], [], []
    TW
, TR1, TR2 = [], [], []
    thread1
= threading.Thread(
        target
=writer,
        args
=(W, 'foo', 0.2, lock, TW),
       
)
    thread2
= threading.Thread(
        target
=reader,
        args
=(W, R1, 0.2, lock, TR1),
       
)
    thread3
= threading.Thread(
        target
=reader,
        args
=(W, R2, 0.2, lock, TR2),
       
)
    thread1
.start()
    time
.sleep(0.1)
    thread2
.start()
    thread3
.start()
    time
.sleep(0.8)
   
assert 'foo' in R1
   
assert 'foo' in R2
   
assert TR1[0] <= TR2[1]             # Read 1 started during read 2.
   
assert TR2[0] <= TR1[1]             # Read 2 started during read 1.
   
assert TR1[0] >= TW[1]              # Read 1 started after write.
   
assert TR2[0] >= TW[1]              # Read 2 started after write.


def test_writeReadReadWrite():
   
lock = mrow.RWLock()
    W
, R1, R2 = [], [], []
    TW1
, TR1, TR2, TW2 = [], [], [], []
    thread1
= threading.Thread(
        target
=writer,
        args
=(W, 'foo', 0.3, lock, TW1),
       
)
    thread2
= threading.Thread(
        target
=reader,
        args
=(W, R1, 0.3, lock, TR1),
       
)
    thread3
= threading.Thread(
        target
=reader,
        args
=(W, R2, 0.3, lock, TR2),
       
)
    thread4
= threading.Thread(
        target
=writer,
        args
=(W, 'bar', 0.3, lock, TW2),
       
)
    thread1
.start()
    time
.sleep(0.1)
    thread2
.start()
    time
.sleep(0.1)
    thread3
.start()
    time
.sleep(0.1)
    thread4
.start()
    time
.sleep(1.7)
   
assert 'foo' in R1
   
assert 'foo' in R2
   
assert 'bar' not in R1
   
assert 'bar' not in R2
   
assert 'bar' in W
   
assert TR1[0] <= TR2[1]              # Read 1 started during read 2.
   
assert TR2[0] <= TR1[1]              # Read 2 started during read 1.
   
assert TR1[0] >= TW1[1]              # Read 1 started after write 1.
   
assert TR2[0] >= TW1[1]              # Read 2 started after write 1.
   
assert TW2[0] >= TR1[1]              # Write 2 started after read 1.
   
assert TW2[0] >= TR2[1]              # Write 2 started after read 2.


def test_writeReadReadtowrite():
   
lock = mrow.RWLock()
    W
, R1 = [], []
    TW1
, TR1, TW2 = [], [], []
    thread1
= threading.Thread(
        target
=writer,
        args
=(W, 'foo', 0.3, lock, TW1),
       
)
    thread2
= threading.Thread(
        target
=reader,
        args
=(W, R1, 0.3, lock, TR1),
       
)
    thread3
= threading.Thread(
        target
=readerTurnedWriter,
        args
=(W, 'bar', 0.3, lock, TW2),
       
)
    thread1
.start()
    time
.sleep(0.1)
    thread2
.start()
    time
.sleep(0.1)
    thread3
.start()
    time
.sleep(1.7)
   
assert 'foo' in R1
   
assert 'bar' not in R1
   
assert 'bar' in W
   
assert TR1[0] >= TW1[1]              # Read 1 started after write 1.
   
assert TW2[0] >= TR1[1]              # Write 2 started after read 1.

History

  • revision 2 (18 years ago)
  • previous revisions are not available