Welcome, guest | Sign In | My Account | Store | Cart

During a recent application server design, I came across the need for a thread locking class which supports read and write locks (in the filesystem lock kind of way).

Python, 224 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# -*- coding: iso-8859-15 -*-
"""locks.py - Read-Write lock thread lock implementation

See the class documentation for more info.

Copyright (C) 2007, Heiko Wundram.
Released under the BSD-license.
"""

# Imports
# -------

from threading import Condition, Lock, currentThread
from time import time


# Read write lock
# ---------------

class ReadWriteLock(object):
    """Read-Write lock class. A read-write lock differs from a standard
    threading.RLock() by allowing multiple threads to simultaneously hold a
    read lock, while allowing only a single thread to hold a write lock at the
    same point of time.

    When a read lock is requested while a write lock is held, the reader
    is blocked; when a write lock is requested while another write lock is
    held or there are read locks, the writer is blocked.

    Writers are always preferred by this implementation: if there are blocked
    threads waiting for a write lock, current readers may request more read
    locks (which they eventually should free, as they starve the waiting
    writers otherwise), but a new thread requesting a read lock will not
    be granted one, and block. This might mean starvation for readers if
    two writer threads interweave their calls to acquireWrite() without
    leaving a window only for readers.

    In case a current reader requests a write lock, this can and will be
    satisfied without giving up the read locks first, but, only one thread
    may perform this kind of lock upgrade, as a deadlock would otherwise
    occur. After the write lock has been granted, the thread will hold a
    full write lock, and not be downgraded after the upgrading call to
    acquireWrite() has been match by a corresponding release().
    """

    def __init__(self):
        """Initialize this read-write lock."""

        # Condition variable, used to signal waiters of a change in object
        # state.
        self.__condition = Condition(Lock())

        # Initialize with no writers.
        self.__writer = None
        self.__upgradewritercount = 0
        self.__pendingwriters = []

        # Initialize with no readers.
        self.__readers = {}

    def acquireRead(self,timeout=None):
        """Acquire a read lock for the current thread, waiting at most
        timeout seconds or doing a non-blocking check in case timeout is <= 0.

        In case timeout is None, the call to acquireRead blocks until the
        lock request can be serviced.

        In case the timeout expires before the lock could be serviced, a
        RuntimeError is thrown."""

        if timeout is not None:
            endtime = time() + timeout
        me = currentThread()
        self.__condition.acquire()
        try:
            if self.__writer is me:
                # If we are the writer, grant a new read lock, always.
                self.__writercount += 1
                return
            while True:
                if self.__writer is None:
                    # Only test anything if there is no current writer.
                    if self.__upgradewritercount or self.__pendingwriters:
                        if me in self.__readers:
                            # Only grant a read lock if we already have one
                            # in case writers are waiting for their turn.
                            # This means that writers can't easily get starved
                            # (but see below, readers can).
                            self.__readers[me] += 1
                            return
                        # No, we aren't a reader (yet), wait for our turn.
                    else:
                        # Grant a new read lock, always, in case there are
                        # no pending writers (and no writer).
                        self.__readers[me] = self.__readers.get(me,0) + 1
                        return
                if timeout is not None:
                    remaining = endtime - time()
                    if remaining <= 0:
                        # Timeout has expired, signal caller of this.
                        raise RuntimeError("Acquiring read lock timed out")
                    self.__condition.wait(remaining)
                else:
                    self.__condition.wait()
        finally:
            self.__condition.release()

    def acquireWrite(self,timeout=None):
        """Acquire a write lock for the current thread, waiting at most
        timeout seconds or doing a non-blocking check in case timeout is <= 0.

        In case the write lock cannot be serviced due to the deadlock
        condition mentioned above, a ValueError is raised.

        In case timeout is None, the call to acquireWrite blocks until the
        lock request can be serviced.

        In case the timeout expires before the lock could be serviced, a
        RuntimeError is thrown."""

        if timeout is not None:
            endtime = time() + timeout
        me, upgradewriter = currentThread(), False
        self.__condition.acquire()
        try:
            if self.__writer is me:
                # If we are the writer, grant a new write lock, always.
                self.__writercount += 1
                return
            elif me in self.__readers:
                # If we are a reader, no need to add us to pendingwriters,
                # we get the upgradewriter slot.
                if self.__upgradewritercount:
                    # If we are a reader and want to upgrade, and someone
                    # else also wants to upgrade, there is no way we can do
                    # this except if one of us releases all his read locks.
                    # Signal this to user.
                    raise ValueError(
                        "Inevitable dead lock, denying write lock"
                        )
                upgradewriter = True
                self.__upgradewritercount = self.__readers.pop(me)
            else:
                # We aren't a reader, so add us to the pending writers queue
                # for synchronization with the readers.
                self.__pendingwriters.append(me)
            while True:
                if not self.__readers and self.__writer is None:
                    # Only test anything if there are no readers and writers.
                    if self.__upgradewritercount:
                        if upgradewriter:
                            # There is a writer to upgrade, and it's us. Take
                            # the write lock.
                            self.__writer = me
                            self.__writercount = self.__upgradewritercount + 1
                            self.__upgradewritercount = 0
                            return
                        # There is a writer to upgrade, but it's not us.
                        # Always leave the upgrade writer the advance slot,
                        # because he presumes he'll get a write lock directly
                        # from a previously held read lock.
                    elif self.__pendingwriters[0] is me:
                        # If there are no readers and writers, it's always
                        # fine for us to take the writer slot, removing us
                        # from the pending writers queue.
                        # This might mean starvation for readers, though.
                        self.__writer = me
                        self.__writercount = 1
                        self.__pendingwriters = self.__pendingwriters[1:]
                        return
                if timeout is not None:
                    remaining = endtime - time()
                    if remaining <= 0:
                        # Timeout has expired, signal caller of this.
                        if upgradewriter:
                            # Put us back on the reader queue. No need to
                            # signal anyone of this change, because no other
                            # writer could've taken our spot before we got
                            # here (because of remaining readers), as the test
                            # for proper conditions is at the start of the
                            # loop, not at the end.
                            self.__readers[me] = self.__upgradewritercount
                            self.__upgradewritercount = 0
                        else:
                            # We were a simple pending writer, just remove us
                            # from the FIFO list.
                            self.__pendingwriters.remove(me)
                        raise RuntimeError("Acquiring write lock timed out")
                    self.__condition.wait(remaining)
                else:
                    self.__condition.wait()
        finally:
            self.__condition.release()

    def release(self):
        """Release the currently held lock.

        In case the current thread holds no lock, a ValueError is thrown."""

        me = currentThread()
        self.__condition.acquire()
        try:
            if self.__writer is me:
                # We are the writer, take one nesting depth away.
                self.__writercount -= 1
                if not self.__writercount:
                    # No more write locks; take our writer position away and
                    # notify waiters of the new circumstances.
                    self.__writer = None
                    self.__condition.notifyAll()
            elif me in self.__readers:
                # We are a reader currently, take one nesting depth away.
                self.__readers[me] -= 1
                if not self.__readers[me]:
                    # No more read locks, take our reader position away.
                    del self.__readers[me]
                    if not self.__readers:
                        # No more readers, notify waiters of the new
                        # circumstances.
                        self.__condition.notifyAll()
            else:
                raise ValueError("Trying to release unheld lock")
        finally:
            self.__condition.release()

I needed the functionality that this class provides in a CherryPy-based webserver, which uses threading. Basically, I check for changed template files at a certain fixed interval in a daemon thread, having files be reloaded from disk in case the mtime doesn't match.

As template users (webserver threads) don't need mutual exclusion, but just the confidence that another thread isn't messing around with the template code environment they expect, they can share the state, just as the template checking thread doesn't need write access to the template code in case the mtime of the template file hasn't changed.

Basically what evolved was code of the following sorts:

tmpl_lock = ReadWriteLock()

Reader:

tmpl_lock.acquireRead() try:

finally: tmpl_lock.release()

Reloader:

tmpl_lock.acquireRead() try: : tmpl_lock.acquireWrite() try:

    finally:
        tmpl_lock.release()

finally: tmpl_lock.release()

The writer case can never enter the deadlock condition described in the module source, as there is only a single writer thread which does an upgrade.

The actual class is pretty close to what Tanenbaum describes as reader/writer locking in "Operating Systems - Design and Implementation" in the chapter on processes, only that it's implemented in Python for Python thread synchronization.

The usual caveats apply, such as reader starvation if several writers overlap, and spurious ValueErrors (because of inevitable deadlock) when you have several different threads do "state upgrades". The first kind of problem, total reader starvation, could be resolved, but for my use case, that's not a priority.

The class hasn't been heavily tested, but works for me (TM).

3 comments

Jim Pryor 16 years ago  # | flag

Wonderful. This is great code. I had been using a class I based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66426, but this nicely handles a bunch of limitations on re-entrancy I was coming up against.

Two suggestions. First, make the signatures of the acquire methods take two arguments:

def acquireRead(self, blocking=True, timeout=True)

to stay closer to the calling conventions of the threading model. Then you can just replace your initial test on timeout with:

if not blocking:
    endtime = -1
elif timeout is not None:
    endtime = time() + timeout
else:
    endtime = None

and at the end of the acquires methods, replace "if timeout is not None" with "if endtime is not None".

Second, add in the following helper methods:

from contextlib import contextmanager
@property
@contextmanager
def readlock(self):
    self.acquireRead()
    try:
        yield
    finally:
        self.release()

and similarly for writelock/acquireWrite.

Then you can write code like this:

from __future__ import with_statement
lock = ReadWriteLock()
with lock.readlock:
    pass # do stuff with lock
Jim Pryor 16 years ago  # | flag

Also, shouldn't this be in the Threads category?

Steve Anderson 12 years, 6 months ago  # | flag

I'm looking at using this recipe in a project I'm working on. I'd like to modify it to not always raise a ValueError exception in the multiple state upgrade deadlock scenario. If the acquireWrite call has a timeout set, I think it should instead raise a RuntimeError to indicate a timeout. While the timeout may not have actually expired, in theory it would deadlock until it did expire, then it would give up and the deadlock would be broken. So rather than causing a deadlock exception, I'd like to cause a more manageable timeout exception if the caller indicated (by setting a timeout) that they are prepared to gracefully handle that scenario.

Would it be as simple as:

elif me in self.__readers:
    # If we are a reader, no need to add us to pendingwriters,
    # we get the upgradewriter slot.
    if self.__upgradewritercount:
        # If we are a reader and want to upgrade, and someone
        # else also wants to upgrade, there is no way we can do
        # this except if one of us releases all his read locks.
        # Signal this to user.
        if timeout is not None:
            raise RuntimeError("Write lock upgrade would deadlock until timeout")
        else:
            raise ValueError("Inevitable dead lock, denying write lock")