Welcome, guest | Sign In | My Account | Store | Cart
######### mrow.py #########

"""Multiple-reader-one-writer resource locking."""

from thread import get_ident
import threading


def acquireLocked(fn):
    def _acquireLocked_wrapper(self, *args, **kw):
        L = self.acquireLock
        try:
            L.acquire()
            result = fn(self, *args, **kw)
        finally:
            L.release()
        return result
    return _acquireLocked_wrapper


def releaseLocked(fn):
    def _releaseLocked_wrapper(self, *args, **kw):
        L = self.releaseLock
        try:
            L.acquire()
            result = fn(self, *args, **kw)
        finally:
            L.release()
        return result
    return _releaseLocked_wrapper


class RWLock(object):
    """MROW resource lock."""

    def __init__(self):
        self.acquireLock = threading.Lock()
        self.releaseLock = threading.Lock()
        self.sublocks = []
        self.waiting = []
        self.readers = 0
        self.writing = False
        self.threadReaders = {}
        self.threadWriters = {}

    def reader(self):
        """Return an acquired read lock."""
        threadReaders, threadWriters = self.threadReaders, self.threadWriters
        ident = get_ident()
        if ident in threadReaders:
            sublock, count = threadReaders[ident]
            threadReaders[ident] = (sublock, count + 1)
            return sublock
        elif ident in threadWriters:
            # Writers are inherently readers, so treat as a reentrant
            # write lock.
            sublock, count = threadWriters[ident]
            threadWriters[ident] = (sublock, count + 1)
            return sublock
        sublock = RLock(self)
        if self.writing:
            # Wait for acquired writers to release.
            self.waiting.append(sublock)
            sublock.acquire()
        sublock.acquire()
        self.readers += 1
        self.sublocks.append(sublock)
        threadReaders[ident] = (sublock, 1)
        return sublock
    reader = acquireLocked(reader)

    def writer(self):
        """Return an acquired write lock."""
        threadReaders, threadWriters = self.threadReaders, self.threadWriters
        ident = get_ident()
        wasReader = False
        if ident in threadWriters:
            sublock, count = threadWriters[ident]
            threadWriters[ident] = (sublock, count + 1)
            return sublock
        elif ident in threadReaders:
            # Readers-turned-writers must wait for any reads to complete
            # before turning into writers.
            sublock, count = threadReaders[ident]
            del threadReaders[ident]
            self.readers -= 1
            self.sublocks.remove(sublock)
            wasReader = True
        sublock = WLock(self)
        if self.readers or self.writing:
            # Wait for acquired readers/writers to release.
            self.waiting.append(sublock)
            sublock.acquire()
        sublock.acquire()
        self.writing = True
        self.sublocks.append(sublock)
        if not wasReader:
            count = 0
        threadWriters[ident] = (sublock, count + 1)
        return sublock
    writer = acquireLocked(writer)

    def _releaseR(self, sublock):
        sublocks = self.sublocks
        if sublock in sublocks:
            threadReaders = self.threadReaders
            ident = get_ident()
            count = threadReaders[ident][1] - 1
            if count:
                threadReaders[ident] = (sublock, count)
            else:
                del threadReaders[ident]
                self.readers -= 1
                sublocks.remove(sublock)
                waiting = self.waiting
                if waiting and not self.readers:
                    # If a lock is waiting at this point, it is a write lock.
                    waiting.pop(0)._release()
    _releaseR = releaseLocked(_releaseR)

    def _releaseW(self, sublock):
        sublocks = self.sublocks
        if sublock in sublocks:
            threadWriters = self.threadWriters
            ident = get_ident()
            count = threadWriters[ident][1] - 1
            if count:
                threadWriters[ident] = (sublock, count)
            else:
                del threadWriters[ident]
                self.writing = False
                sublocks.remove(sublock)
                waiting = self.waiting
                # Release any waiting read locks.
                while waiting and isinstance(waiting[0], RLock):
                    waiting.pop(0)._release()
    _releaseW = releaseLocked(_releaseW)


class SubLock(object):

    def __init__(self, rwlock):
        self.lock = threading.Lock()
        self.rwlock = rwlock

    def _release(self):
        self.lock.release()

    def acquire(self):
        self.lock.acquire()


class RLock(SubLock):

    def release(self):
        self.rwlock._releaseR(self)


class WLock(SubLock):

    def release(self):
        self.rwlock._releaseW(self)



######### test_mrow.py #########


import threading
import time

from schevo.lib import mrow


def writer(L, value, after, rwlock, times):
    """Append value to L after a period of time."""
    try:
        lock = rwlock.writer()
        # Get another lock, to test the fact that obtaining multiple
        # write locks from the same thread context doesn't block (lock
        # reentrancy).
        lock2 = rwlock.writer()
        # Get a reader lock too; should be the same as getting another
        # writer since writers are inherently readers as well.
        lock3 = rwlock.reader()
        times.append(time.time())
        time.sleep(after)
        L.append(value)
    finally:
        times.append(time.time())
        lock3.release()
        lock2.release()
        lock.release()


def reader(L1, L2, after, rwlock, times):
    """Append values from L1 to L2 after a period of time."""
    try:
        lock = rwlock.reader()
        # Get another lock, to test the fact that obtaining multiple
        # write locks from the same thread context doesn't block (lock
        # reentrancy).
        lock2 = rwlock.reader()
        times.append(time.time())
        time.sleep(after)
        L2.extend(L1)
    finally:
        times.append(time.time())
        lock2.release()
        lock.release()


def readerTurnedWriter(L, value, after, rwlock, times):
    """Append value to L after a period of time."""
    try:
        lock = rwlock.reader()
        lock2 = rwlock.writer()
        times.append(time.time())
        time.sleep(after)
        L.append(value)
    finally:
        times.append(time.time())
        lock2.release()
        lock.release()


def test_reentrancy():
    lock = mrow.RWLock()
    # Reentrant read locks.
    rlock1 = lock.reader()
    rlock2 = lock.reader()
    rlock2.release()
    rlock1.release()
    # Reentrant write locks.
    wlock1 = lock.writer()
    wlock2 = lock.writer()
    wlock2.release()
    wlock1.release()
    # Writers are also readers.
    wlock = lock.writer()
    rlock = lock.reader()
    rlock.release()
    wlock.release()


def test_writeReadRead():
    lock = mrow.RWLock()
    W, R1, R2 = [], [], []
    TW, TR1, TR2 = [], [], []
    thread1 = threading.Thread(
        target=writer,
        args=(W, 'foo', 0.2, lock, TW),
        )
    thread2 = threading.Thread(
        target=reader,
        args=(W, R1, 0.2, lock, TR1),
        )
    thread3 = threading.Thread(
        target=reader,
        args=(W, R2, 0.2, lock, TR2),
        )
    thread1.start()
    time.sleep(0.1)
    thread2.start()
    thread3.start()
    time.sleep(0.8)
    assert 'foo' in R1
    assert 'foo' in R2
    assert TR1[0] <= TR2[1]             # Read 1 started during read 2.
    assert TR2[0] <= TR1[1]             # Read 2 started during read 1.
    assert TR1[0] >= TW[1]              # Read 1 started after write.
    assert TR2[0] >= TW[1]              # Read 2 started after write.


def test_writeReadReadWrite():
    lock = mrow.RWLock()
    W, R1, R2 = [], [], []
    TW1, TR1, TR2, TW2 = [], [], [], []
    thread1 = threading.Thread(
        target=writer,
        args=(W, 'foo', 0.3, lock, TW1),
        )
    thread2 = threading.Thread(
        target=reader,
        args=(W, R1, 0.3, lock, TR1),
        )
    thread3 = threading.Thread(
        target=reader,
        args=(W, R2, 0.3, lock, TR2),
        )
    thread4 = threading.Thread(
        target=writer,
        args=(W, 'bar', 0.3, lock, TW2),
        )
    thread1.start()
    time.sleep(0.1)
    thread2.start()
    time.sleep(0.1)
    thread3.start()
    time.sleep(0.1)
    thread4.start()
    time.sleep(1.7)
    assert 'foo' in R1
    assert 'foo' in R2
    assert 'bar' not in R1
    assert 'bar' not in R2
    assert 'bar' in W
    assert TR1[0] <= TR2[1]              # Read 1 started during read 2.
    assert TR2[0] <= TR1[1]              # Read 2 started during read 1.
    assert TR1[0] >= TW1[1]              # Read 1 started after write 1.
    assert TR2[0] >= TW1[1]              # Read 2 started after write 1.
    assert TW2[0] >= TR1[1]              # Write 2 started after read 1.
    assert TW2[0] >= TR2[1]              # Write 2 started after read 2.


def test_writeReadReadtowrite():
    lock = mrow.RWLock()
    W, R1 = [], []
    TW1, TR1, TW2 = [], [], []
    thread1 = threading.Thread(
        target=writer,
        args=(W, 'foo', 0.3, lock, TW1),
        )
    thread2 = threading.Thread(
        target=reader,
        args=(W, R1, 0.3, lock, TR1),
        )
    thread3 = threading.Thread(
        target=readerTurnedWriter,
        args=(W, 'bar', 0.3, lock, TW2),
        )
    thread1.start()
    time.sleep(0.1)
    thread2.start()
    time.sleep(0.1)
    thread3.start()
    time.sleep(1.7)
    assert 'foo' in R1
    assert 'bar' not in R1
    assert 'bar' in W
    assert TR1[0] >= TW1[1]              # Read 1 started after write 1.
    assert TW2[0] >= TR1[1]              # Write 2 started after read 1.

History

  • revision 2 (18 years ago)
  • previous revisions are not available