In multithreaded apps, there is at times the need to control access to a resource to ensure data consistency and integrity. Multiple-reader, one-writer locking allows efficient read access by multiple threads, while ensuring that a write does not overlap any reads nor another write.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 | ######### mrow.py #########
"""Multiple-reader-one-writer resource locking."""
from thread import get_ident
import threading
def acquireLocked(fn):
def _acquireLocked_wrapper(self, *args, **kw):
L = self.acquireLock
try:
L.acquire()
result = fn(self, *args, **kw)
finally:
L.release()
return result
return _acquireLocked_wrapper
def releaseLocked(fn):
def _releaseLocked_wrapper(self, *args, **kw):
L = self.releaseLock
try:
L.acquire()
result = fn(self, *args, **kw)
finally:
L.release()
return result
return _releaseLocked_wrapper
class RWLock(object):
"""MROW resource lock."""
def __init__(self):
self.acquireLock = threading.Lock()
self.releaseLock = threading.Lock()
self.sublocks = []
self.waiting = []
self.readers = 0
self.writing = False
self.threadReaders = {}
self.threadWriters = {}
def reader(self):
"""Return an acquired read lock."""
threadReaders, threadWriters = self.threadReaders, self.threadWriters
ident = get_ident()
if ident in threadReaders:
sublock, count = threadReaders[ident]
threadReaders[ident] = (sublock, count + 1)
return sublock
elif ident in threadWriters:
# Writers are inherently readers, so treat as a reentrant
# write lock.
sublock, count = threadWriters[ident]
threadWriters[ident] = (sublock, count + 1)
return sublock
sublock = RLock(self)
if self.writing:
# Wait for acquired writers to release.
self.waiting.append(sublock)
sublock.acquire()
sublock.acquire()
self.readers += 1
self.sublocks.append(sublock)
threadReaders[ident] = (sublock, 1)
return sublock
reader = acquireLocked(reader)
def writer(self):
"""Return an acquired write lock."""
threadReaders, threadWriters = self.threadReaders, self.threadWriters
ident = get_ident()
wasReader = False
if ident in threadWriters:
sublock, count = threadWriters[ident]
threadWriters[ident] = (sublock, count + 1)
return sublock
elif ident in threadReaders:
# Readers-turned-writers must wait for any reads to complete
# before turning into writers.
sublock, count = threadReaders[ident]
del threadReaders[ident]
self.readers -= 1
self.sublocks.remove(sublock)
wasReader = True
sublock = WLock(self)
if self.readers or self.writing:
# Wait for acquired readers/writers to release.
self.waiting.append(sublock)
sublock.acquire()
sublock.acquire()
self.writing = True
self.sublocks.append(sublock)
if not wasReader:
count = 0
threadWriters[ident] = (sublock, count + 1)
return sublock
writer = acquireLocked(writer)
def _releaseR(self, sublock):
sublocks = self.sublocks
if sublock in sublocks:
threadReaders = self.threadReaders
ident = get_ident()
count = threadReaders[ident][1] - 1
if count:
threadReaders[ident] = (sublock, count)
else:
del threadReaders[ident]
self.readers -= 1
sublocks.remove(sublock)
waiting = self.waiting
if waiting and not self.readers:
# If a lock is waiting at this point, it is a write lock.
waiting.pop(0)._release()
_releaseR = releaseLocked(_releaseR)
def _releaseW(self, sublock):
sublocks = self.sublocks
if sublock in sublocks:
threadWriters = self.threadWriters
ident = get_ident()
count = threadWriters[ident][1] - 1
if count:
threadWriters[ident] = (sublock, count)
else:
del threadWriters[ident]
self.writing = False
sublocks.remove(sublock)
waiting = self.waiting
# Release any waiting read locks.
while waiting and isinstance(waiting[0], RLock):
waiting.pop(0)._release()
_releaseW = releaseLocked(_releaseW)
class SubLock(object):
def __init__(self, rwlock):
self.lock = threading.Lock()
self.rwlock = rwlock
def _release(self):
self.lock.release()
def acquire(self):
self.lock.acquire()
class RLock(SubLock):
def release(self):
self.rwlock._releaseR(self)
class WLock(SubLock):
def release(self):
self.rwlock._releaseW(self)
######### test_mrow.py #########
import threading
import time
from schevo.lib import mrow
def writer(L, value, after, rwlock, times):
"""Append value to L after a period of time."""
try:
lock = rwlock.writer()
# Get another lock, to test the fact that obtaining multiple
# write locks from the same thread context doesn't block (lock
# reentrancy).
lock2 = rwlock.writer()
# Get a reader lock too; should be the same as getting another
# writer since writers are inherently readers as well.
lock3 = rwlock.reader()
times.append(time.time())
time.sleep(after)
L.append(value)
finally:
times.append(time.time())
lock3.release()
lock2.release()
lock.release()
def reader(L1, L2, after, rwlock, times):
"""Append values from L1 to L2 after a period of time."""
try:
lock = rwlock.reader()
# Get another lock, to test the fact that obtaining multiple
# write locks from the same thread context doesn't block (lock
# reentrancy).
lock2 = rwlock.reader()
times.append(time.time())
time.sleep(after)
L2.extend(L1)
finally:
times.append(time.time())
lock2.release()
lock.release()
def readerTurnedWriter(L, value, after, rwlock, times):
"""Append value to L after a period of time."""
try:
lock = rwlock.reader()
lock2 = rwlock.writer()
times.append(time.time())
time.sleep(after)
L.append(value)
finally:
times.append(time.time())
lock2.release()
lock.release()
def test_reentrancy():
lock = mrow.RWLock()
# Reentrant read locks.
rlock1 = lock.reader()
rlock2 = lock.reader()
rlock2.release()
rlock1.release()
# Reentrant write locks.
wlock1 = lock.writer()
wlock2 = lock.writer()
wlock2.release()
wlock1.release()
# Writers are also readers.
wlock = lock.writer()
rlock = lock.reader()
rlock.release()
wlock.release()
def test_writeReadRead():
lock = mrow.RWLock()
W, R1, R2 = [], [], []
TW, TR1, TR2 = [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.2, lock, TW),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.2, lock, TR1),
)
thread3 = threading.Thread(
target=reader,
args=(W, R2, 0.2, lock, TR2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
thread3.start()
time.sleep(0.8)
assert 'foo' in R1
assert 'foo' in R2
assert TR1[0] <= TR2[1] # Read 1 started during read 2.
assert TR2[0] <= TR1[1] # Read 2 started during read 1.
assert TR1[0] >= TW[1] # Read 1 started after write.
assert TR2[0] >= TW[1] # Read 2 started after write.
def test_writeReadReadWrite():
lock = mrow.RWLock()
W, R1, R2 = [], [], []
TW1, TR1, TR2, TW2 = [], [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.3, lock, TW1),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.3, lock, TR1),
)
thread3 = threading.Thread(
target=reader,
args=(W, R2, 0.3, lock, TR2),
)
thread4 = threading.Thread(
target=writer,
args=(W, 'bar', 0.3, lock, TW2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
time.sleep(0.1)
thread3.start()
time.sleep(0.1)
thread4.start()
time.sleep(1.7)
assert 'foo' in R1
assert 'foo' in R2
assert 'bar' not in R1
assert 'bar' not in R2
assert 'bar' in W
assert TR1[0] <= TR2[1] # Read 1 started during read 2.
assert TR2[0] <= TR1[1] # Read 2 started during read 1.
assert TR1[0] >= TW1[1] # Read 1 started after write 1.
assert TR2[0] >= TW1[1] # Read 2 started after write 1.
assert TW2[0] >= TR1[1] # Write 2 started after read 1.
assert TW2[0] >= TR2[1] # Write 2 started after read 2.
def test_writeReadReadtowrite():
lock = mrow.RWLock()
W, R1 = [], []
TW1, TR1, TW2 = [], [], []
thread1 = threading.Thread(
target=writer,
args=(W, 'foo', 0.3, lock, TW1),
)
thread2 = threading.Thread(
target=reader,
args=(W, R1, 0.3, lock, TR1),
)
thread3 = threading.Thread(
target=readerTurnedWriter,
args=(W, 'bar', 0.3, lock, TW2),
)
thread1.start()
time.sleep(0.1)
thread2.start()
time.sleep(0.1)
thread3.start()
time.sleep(1.7)
assert 'foo' in R1
assert 'bar' not in R1
assert 'bar' in W
assert TR1[0] >= TW1[1] # Read 1 started after write 1.
assert TW2[0] >= TR1[1] # Write 2 started after read 1.
|
After implementing a single read-write database locking mechanism in Schevo, Dominic Fox suggested that Schevo use a multiple-reader-one-writer (MROW) locking mechanism for greater efficiency during reads.
We needed a mechanism that had the following qualities:
robust
reentrant locks
simple interface
unit tested
This implementation implements at least the last three of these qualities, and seems to be sufficiently robust.
If anyone has any suggestions or patches that help improve this recipe, I'd love to incorporate them.
Heh. Wrote one of these myself last week! See http://www.codepoetics.com/code/tripoli/synchronization.py.
Oh yeah, Like it says at the end there...
Incidentally, I will be looking at ways of writing unit tests for multi-threading components some time next week (I have a couple of tricks in mind, using the threadable function and AsyncResult class from http://www.codepoetics.com/code/tripoli/concurrency.py); MrowLockable will be the first candidate for testing.
reentrancy. Your work definitely inspired this recipe; I thank you for that :)
Consider adding reentrancy to locks acquired in the same thread; for instance, calling outer() in an instance of the class below will deadlock:
deadlock corner cases. I've updated the recipe to avoid some potential deadlock scenarios:
Threads that have acquired a write lock can subsequently acquire read locks, as writers are implicitly readers as well.
Threads that have acquired a read lock can subsequently acquire a write lock. The operation will block until existing read locks are released, then the read lock will morph into a write lock for the remainder of the lock acquisition's life span.
Caveat: The latter, while preventing a deadlock, might cause already-read data to become stale if a read lock morphs into a write lock after another write lock has been requested. I'll probably write a test case to illustrate this at some point.
Check out Doug Lea's books and web site. A lot of inspiration can be taken from Doug Lea's work. First of all, there is the Java util.concurrent package (which has been integrated into Java 5) :
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
This package is explained in detail in a very good book, Concurrent Programming in Java :
http://www.amazon.com/exec/obidos/ASIN/0201310090/
Lock-free structures. Just read an interesting article in C++ Journal recently, about lock-free structures, which are perfect for read-many-write-rarely and hence for MROW (as one writer cannot stumble onto itself - see code below, it's hopefully clear...).
(From memory) The basic idea is to use a pointer to a structure. Readers can read as much as they like, by reading the pointer, then going to the structure, without any locking. Writers make a copy of the structure, modify it, and then must atomically test-and-set the pointer to the structure.
This test-and-set operation was proved to be the most robust and efficient way to achieve lock-free resource access, and is implemented as one assembly instruction in many existing processors. In pure python, it would have to lock and would look something like this:
And used like this (assuming 'pointer.pointee' is a structure):
No time now to implement, please give it a go and tell me ;-)
The advantage in Python is that the discarded object will be garbage-collected. In non-GC C++, there's a lot of work to do!
It would be good to have this test_and_set as a native atomic operation!
Adding re-entrancy. I took a slightly different approach to re-entrancy in the end, adding a re-entrant wrapper to the non-reentrant lock class. This keeps the implementation of the basic lock class simple, and provides a fairly clean separation of concerns.
One other modification I made was to force any read request from a new thread to block if there were any write requests pending, so that readers could not starve writers of access.
The new code, together with some unit tests, is part of the "syncopated" module in Tripoli, which is maintained as a separate module and licensed under the Lesser GPL. See http://www.sourceforge.net/projects/tripoli for downloads.
There is a bug with this recipe. The exception i reproduced was "error, releasing unacquire lock" on a simple code the reloads a configuration file (writer lock) while other threads uses the readerlock to read. After replacing it with recipe 502283 without making any code change on dependencies (same module name), my problem was resolved.
I replace my codes with : http://code.activestate.com/recipes/502283/ instead, adding in
class RWLock (ReadWriteLock) :
Note that there is a proposal to add threading.RWLock to the standard library. It uses a different implementation.