Welcome, guest | Sign In | My Account | Store | Cart

In multithreaded apps, there is at times the need to control access to a resource to ensure data consistency and integrity. Multiple-reader, one-writer locking allows efficient read access by multiple threads, while ensuring that a write does not overlap any reads nor another write.

Python, 342 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
######### mrow.py #########

"""Multiple-reader-one-writer resource locking."""

from thread import get_ident
import threading


def acquireLocked(fn):
    def _acquireLocked_wrapper(self, *args, **kw):
        L = self.acquireLock
        try:
            L.acquire()
            result = fn(self, *args, **kw)
        finally:
            L.release()
        return result
    return _acquireLocked_wrapper


def releaseLocked(fn):
    def _releaseLocked_wrapper(self, *args, **kw):
        L = self.releaseLock
        try:
            L.acquire()
            result = fn(self, *args, **kw)
        finally:
            L.release()
        return result
    return _releaseLocked_wrapper


class RWLock(object):
    """MROW resource lock."""

    def __init__(self):
        self.acquireLock = threading.Lock()
        self.releaseLock = threading.Lock()
        self.sublocks = []
        self.waiting = []
        self.readers = 0
        self.writing = False
        self.threadReaders = {}
        self.threadWriters = {}

    def reader(self):
        """Return an acquired read lock."""
        threadReaders, threadWriters = self.threadReaders, self.threadWriters
        ident = get_ident()
        if ident in threadReaders:
            sublock, count = threadReaders[ident]
            threadReaders[ident] = (sublock, count + 1)
            return sublock
        elif ident in threadWriters:
            # Writers are inherently readers, so treat as a reentrant
            # write lock.
            sublock, count = threadWriters[ident]
            threadWriters[ident] = (sublock, count + 1)
            return sublock
        sublock = RLock(self)
        if self.writing:
            # Wait for acquired writers to release.
            self.waiting.append(sublock)
            sublock.acquire()
        sublock.acquire()
        self.readers += 1
        self.sublocks.append(sublock)
        threadReaders[ident] = (sublock, 1)
        return sublock
    reader = acquireLocked(reader)

    def writer(self):
        """Return an acquired write lock."""
        threadReaders, threadWriters = self.threadReaders, self.threadWriters
        ident = get_ident()
        wasReader = False
        if ident in threadWriters:
            sublock, count = threadWriters[ident]
            threadWriters[ident] = (sublock, count + 1)
            return sublock
        elif ident in threadReaders:
            # Readers-turned-writers must wait for any reads to complete
            # before turning into writers.
            sublock, count = threadReaders[ident]
            del threadReaders[ident]
            self.readers -= 1
            self.sublocks.remove(sublock)
            wasReader = True
        sublock = WLock(self)
        if self.readers or self.writing:
            # Wait for acquired readers/writers to release.
            self.waiting.append(sublock)
            sublock.acquire()
        sublock.acquire()
        self.writing = True
        self.sublocks.append(sublock)
        if not wasReader:
            count = 0
        threadWriters[ident] = (sublock, count + 1)
        return sublock
    writer = acquireLocked(writer)

    def _releaseR(self, sublock):
        sublocks = self.sublocks
        if sublock in sublocks:
            threadReaders = self.threadReaders
            ident = get_ident()
            count = threadReaders[ident][1] - 1
            if count:
                threadReaders[ident] = (sublock, count)
            else:
                del threadReaders[ident]
                self.readers -= 1
                sublocks.remove(sublock)
                waiting = self.waiting
                if waiting and not self.readers:
                    # If a lock is waiting at this point, it is a write lock.
                    waiting.pop(0)._release()
    _releaseR = releaseLocked(_releaseR)

    def _releaseW(self, sublock):
        sublocks = self.sublocks
        if sublock in sublocks:
            threadWriters = self.threadWriters
            ident = get_ident()
            count = threadWriters[ident][1] - 1
            if count:
                threadWriters[ident] = (sublock, count)
            else:
                del threadWriters[ident]
                self.writing = False
                sublocks.remove(sublock)
                waiting = self.waiting
                # Release any waiting read locks.
                while waiting and isinstance(waiting[0], RLock):
                    waiting.pop(0)._release()
    _releaseW = releaseLocked(_releaseW)


class SubLock(object):

    def __init__(self, rwlock):
        self.lock = threading.Lock()
        self.rwlock = rwlock

    def _release(self):
        self.lock.release()

    def acquire(self):
        self.lock.acquire()


class RLock(SubLock):

    def release(self):
        self.rwlock._releaseR(self)


class WLock(SubLock):

    def release(self):
        self.rwlock._releaseW(self)



######### test_mrow.py #########


import threading
import time

from schevo.lib import mrow


def writer(L, value, after, rwlock, times):
    """Append value to L after a period of time."""
    try:
        lock = rwlock.writer()
        # Get another lock, to test the fact that obtaining multiple
        # write locks from the same thread context doesn't block (lock
        # reentrancy).
        lock2 = rwlock.writer()
        # Get a reader lock too; should be the same as getting another
        # writer since writers are inherently readers as well.
        lock3 = rwlock.reader()
        times.append(time.time())
        time.sleep(after)
        L.append(value)
    finally:
        times.append(time.time())
        lock3.release()
        lock2.release()
        lock.release()


def reader(L1, L2, after, rwlock, times):
    """Append values from L1 to L2 after a period of time."""
    try:
        lock = rwlock.reader()
        # Get another lock, to test the fact that obtaining multiple
        # write locks from the same thread context doesn't block (lock
        # reentrancy).
        lock2 = rwlock.reader()
        times.append(time.time())
        time.sleep(after)
        L2.extend(L1)
    finally:
        times.append(time.time())
        lock2.release()
        lock.release()


def readerTurnedWriter(L, value, after, rwlock, times):
    """Append value to L after a period of time."""
    try:
        lock = rwlock.reader()
        lock2 = rwlock.writer()
        times.append(time.time())
        time.sleep(after)
        L.append(value)
    finally:
        times.append(time.time())
        lock2.release()
        lock.release()


def test_reentrancy():
    lock = mrow.RWLock()
    # Reentrant read locks.
    rlock1 = lock.reader()
    rlock2 = lock.reader()
    rlock2.release()
    rlock1.release()
    # Reentrant write locks.
    wlock1 = lock.writer()
    wlock2 = lock.writer()
    wlock2.release()
    wlock1.release()
    # Writers are also readers.
    wlock = lock.writer()
    rlock = lock.reader()
    rlock.release()
    wlock.release()


def test_writeReadRead():
    lock = mrow.RWLock()
    W, R1, R2 = [], [], []
    TW, TR1, TR2 = [], [], []
    thread1 = threading.Thread(
        target=writer,
        args=(W, 'foo', 0.2, lock, TW),
        )
    thread2 = threading.Thread(
        target=reader,
        args=(W, R1, 0.2, lock, TR1),
        )
    thread3 = threading.Thread(
        target=reader,
        args=(W, R2, 0.2, lock, TR2),
        )
    thread1.start()
    time.sleep(0.1)
    thread2.start()
    thread3.start()
    time.sleep(0.8)
    assert 'foo' in R1
    assert 'foo' in R2
    assert TR1[0] <= TR2[1]             # Read 1 started during read 2.
    assert TR2[0] <= TR1[1]             # Read 2 started during read 1.
    assert TR1[0] >= TW[1]              # Read 1 started after write.
    assert TR2[0] >= TW[1]              # Read 2 started after write.


def test_writeReadReadWrite():
    lock = mrow.RWLock()
    W, R1, R2 = [], [], []
    TW1, TR1, TR2, TW2 = [], [], [], []
    thread1 = threading.Thread(
        target=writer,
        args=(W, 'foo', 0.3, lock, TW1),
        )
    thread2 = threading.Thread(
        target=reader,
        args=(W, R1, 0.3, lock, TR1),
        )
    thread3 = threading.Thread(
        target=reader,
        args=(W, R2, 0.3, lock, TR2),
        )
    thread4 = threading.Thread(
        target=writer,
        args=(W, 'bar', 0.3, lock, TW2),
        )
    thread1.start()
    time.sleep(0.1)
    thread2.start()
    time.sleep(0.1)
    thread3.start()
    time.sleep(0.1)
    thread4.start()
    time.sleep(1.7)
    assert 'foo' in R1
    assert 'foo' in R2
    assert 'bar' not in R1
    assert 'bar' not in R2
    assert 'bar' in W
    assert TR1[0] <= TR2[1]              # Read 1 started during read 2.
    assert TR2[0] <= TR1[1]              # Read 2 started during read 1.
    assert TR1[0] >= TW1[1]              # Read 1 started after write 1.
    assert TR2[0] >= TW1[1]              # Read 2 started after write 1.
    assert TW2[0] >= TR1[1]              # Write 2 started after read 1.
    assert TW2[0] >= TR2[1]              # Write 2 started after read 2.


def test_writeReadReadtowrite():
    lock = mrow.RWLock()
    W, R1 = [], []
    TW1, TR1, TW2 = [], [], []
    thread1 = threading.Thread(
        target=writer,
        args=(W, 'foo', 0.3, lock, TW1),
        )
    thread2 = threading.Thread(
        target=reader,
        args=(W, R1, 0.3, lock, TR1),
        )
    thread3 = threading.Thread(
        target=readerTurnedWriter,
        args=(W, 'bar', 0.3, lock, TW2),
        )
    thread1.start()
    time.sleep(0.1)
    thread2.start()
    time.sleep(0.1)
    thread3.start()
    time.sleep(1.7)
    assert 'foo' in R1
    assert 'bar' not in R1
    assert 'bar' in W
    assert TR1[0] >= TW1[1]              # Read 1 started after write 1.
    assert TW2[0] >= TR1[1]              # Write 2 started after read 1.

After implementing a single read-write database locking mechanism in Schevo, Dominic Fox suggested that Schevo use a multiple-reader-one-writer (MROW) locking mechanism for greater efficiency during reads.

We needed a mechanism that had the following qualities:

  • robust

  • reentrant locks

  • simple interface

  • unit tested

This implementation implements at least the last three of these qualities, and seems to be sufficiently robust.

If anyone has any suggestions or patches that help improve this recipe, I'd love to incorporate them.

9 comments

Dominic Fox 16 years, 7 months ago  # | flag

Heh. Wrote one of these myself last week! See http://www.codepoetics.com/code/tripoli/synchronization.py.

Dominic Fox 16 years, 7 months ago  # | flag

Oh yeah, Like it says at the end there...

Incidentally, I will be looking at ways of writing unit tests for multi-threading components some time next week (I have a couple of tricks in mind, using the threadable function and AsyncResult class from http://www.codepoetics.com/code/tripoli/concurrency.py); MrowLockable will be the first candidate for testing.

Matthew Scott (author) 16 years, 7 months ago  # | flag

reentrancy. Your work definitely inspired this recipe; I thank you for that :)

Consider adding reentrancy to locks acquired in the same thread; for instance, calling outer() in an instance of the class below will deadlock:

class Foo(MrowLockable):

    @MrowLockable.writes
    def outer(self):
        print 'outer'
        self.inner()

    @MrowLockable.writes
    def inner(self):
        print 'inner'
Matthew Scott (author) 16 years, 7 months ago  # | flag

deadlock corner cases. I've updated the recipe to avoid some potential deadlock scenarios:

  • Threads that have acquired a write lock can subsequently acquire read locks, as writers are implicitly readers as well.

  • Threads that have acquired a read lock can subsequently acquire a write lock. The operation will block until existing read locks are released, then the read lock will morph into a write lock for the remainder of the lock acquisition's life span.

Caveat: The latter, while preventing a deadlock, might cause already-read data to become stale if a read lock morphs into a write lock after another write lock has been requested. I'll probably write a test case to illustrate this at some point.

Nicolas Lehuen 16 years, 7 months ago  # | flag

Check out Doug Lea's books and web site. A lot of inspiration can be taken from Doug Lea's work. First of all, there is the Java util.concurrent package (which has been integrated into Java 5) :

http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html

This package is explained in detail in a very good book, Concurrent Programming in Java :

http://www.amazon.com/exec/obidos/ASIN/0201310090/

Gerald Squelart 16 years, 7 months ago  # | flag

Lock-free structures. Just read an interesting article in C++ Journal recently, about lock-free structures, which are perfect for read-many-write-rarely and hence for MROW (as one writer cannot stumble onto itself - see code below, it's hopefully clear...).

(From memory) The basic idea is to use a pointer to a structure. Readers can read as much as they like, by reading the pointer, then going to the structure, without any locking. Writers make a copy of the structure, modify it, and then must atomically test-and-set the pointer to the structure.

This test-and-set operation was proved to be the most robust and efficient way to achieve lock-free resource access, and is implemented as one assembly instruction in many existing processors. In pure python, it would have to lock and would look something like this:

def test_and_set(pointer, expected_value, new_value):
    swapped = False
    lock() # Could lock globally or just on this pointer
    if pointer.pointee == expected_value:
        pointer.pointee = new_value
        swapped = True
    unlock()
    return swapped

And used like this (assuming 'pointer.pointee' is a structure):

# to read:
something = pointer.pointee.something # Look, no lock!

# to write:
while True:
    data_pointer = pointer.pointee # Record the current 'pointer'
    new_data = copy(data_pointer) # Copy the data
    new_data.something = 'bla' # ... Work on new_data
    if test_and_set(pointer, data_pointer, new_data):
        break # Successfully set.
    # Otherwise, try again

No time now to implement, please give it a go and tell me ;-)

The advantage in Python is that the discarded object will be garbage-collected. In non-GC C++, there's a lot of work to do!

It would be good to have this test_and_set as a native atomic operation!

Dominic Fox 16 years, 6 months ago  # | flag

Adding re-entrancy. I took a slightly different approach to re-entrancy in the end, adding a re-entrant wrapper to the non-reentrant lock class. This keeps the implementation of the basic lock class simple, and provides a fairly clean separation of concerns.

One other modification I made was to force any read request from a new thread to block if there were any write requests pending, so that readers could not starve writers of access.

The new code, together with some unit tests, is part of the "syncopated" module in Tripoli, which is maintained as a separate module and licensed under the Lesser GPL. See http://www.sourceforge.net/projects/tripoli for downloads.

marcus low 12 years, 7 months ago  # | flag

There is a bug with this recipe. The exception i reproduced was "error, releasing unacquire lock" on a simple code the reloads a configuration file (writer lock) while other threads uses the readerlock to read. After replacing it with recipe 502283 without making any code change on dependencies (same module name), my problem was resolved.

I replace my codes with : http://code.activestate.com/recipes/502283/ instead, adding in

class RWLock (ReadWriteLock) :

def reader (self) :
    self.acquireRead(timeout=10)
    return self

def writer (self) :
    self.acquireWrite()
    return self
Ivo Danihelka 10 years, 10 months ago  # | flag

Note that there is a proposal to add threading.RWLock to the standard library. It uses a different implementation.