While programming with multiple threads, sometimes one needs a construct which allows to suspend the execution of a set of running threads. This is normally required by an outside thread which wants to suspend the running threads for performing a specific action. The threads need to resume after the action in the same order in which they got suspended.
A thread gate (or gateway) allows you to do this. It acts like a gate through which only one thread can pass at a time. By default the gate is open, allowing all threads to "enter" the gate. When a thread calls "close", the gate is closed, blocking any threads which make a further call to "enter", till the gate is re-opened by the owner, whence the threads resume the order in which they got blocked.
The real-life parallel for this is a human operated level cross, which allows only one vehicle to pass at a time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | import threading
import time
class ThreadGateException(Exception):
pass
class ThreadGate(object):
""" A class which works as a FIFO 'gate' for threads. By
default the gate is open and any thread calling on the
'enter' method returns immediately.
A thread can 'close' the gate by calling the method 'close'.
This thread becomes the owner of the gate. Any other thread
calling 'enter' after this is automatically blocked till
the owner calls reopens the gate by calling 'open'.
The gate requires a certain number of threads to block
before the owner exits from the 'close' method. Otherwise,
the owner waits for a timeout before returning from the 'close'
method, without actually closing the gate.
The gate class can be used to block running threads for a
particular operation and making sure that they resume after
the operation is complete, for a fixed number of threads.
"""
def __init__(self, numthreads, timeout=0):
self.lock = threading.Lock()
self.sem = threading.BoundedSemaphore(1)
self.evt = threading.Event()
self.count = 0
self.owner_timeout = timeout
self.owner = None
self.nthreads = numthreads
# Open by default
self.position = 1
def close(self):
""" Close the gate. The calling thread
becomes the owner of the gate and blocks
till the requisite number of threads block
on the gate or a timeout occurs, whichever
is first.
It is an error if the gate is already closed """
if self.position == 0:
# Already closed
raise ThreadGateException,"trying to close an already closed gate"
try:
self.lock.acquire()
self.position = 0
self.owner = threading.currentThread()
self.sem.acquire()
finally:
self.lock.release()
# Wait on the event till timeout
self.evt.clear()
self.evt.wait(self.owner_timeout)
# If event was set, requisite number off
# threads have blocked, else reset the gate
if not self.evt.isSet():
try:
print 'Owner thread timedout, re-setting gate'
self.lock.acquire()
self.position = 1
self.owner = None
self.sem.release()
finally:
self.lock.release()
return -1
return 0
def open(self):
""" Open the gate. The calling thread should
be the owner of the gate. It is an error if
the gate is already open """
if self.position == 1:
# Already open
raise ThreadGateException,"trying to open an already opened gate"
if threading.currentThread() != self.owner:
raise ThreadGateException,"not owner, cannot open gate"
try:
self.lock.acquire()
self.position = 1
self.owner = None
self.sem.release()
finally:
self.lock.release()
def enter(self):
""" Enter the gate. If the gate is open, returns
immediately, else gets blocked till the gate is
opened by the owner """
if self.position==1:
return 0
# Lock mutex and increment count
try:
self.lock.acquire()
self.count += 1
if self.count==self.nthreads:
self.evt.set()
finally:
self.lock.release()
ct = threading.currentThread()
print 'Thread %s - Entering Gate' % (ct.getName())
# Lock mutex and decrement count
try:
# Will block here
self.sem.acquire()
self.lock.acquire()
self.count -= 1
finally:
self.lock.release()
self.sem.release()
print 'Thread %s - Exiting Gate' % (ct.getName())
def get_count(self):
""" Return count of blocked threads """
return self.count
def test():
""" Test code """
import random
import Queue
enterlog = Queue.Queue(0)
exitlog = Queue.Queue(0)
def enter(index):
enterlog.put(index)
def exit(index):
exitlog.put(index)
class OwnerThread(threading.Thread):
""" Owner thread class for gate demo """
def __init__(self, gate):
self.gate = gate
threading.Thread.__init__(self, None, 'Owner thread')
def run(self):
# Close the gate
print 'Closing gate...'
ret = self.gate.close()
if ret==0:
print 'Gate closed successfully'
print 'Gate count=>',self.gate.get_count()
# Open gate after sleeping some time
time.sleep(5)
if ret==0:
print 'Opening gate'
self.gate.open()
else:
print 'Gate closing not successful'
class SampleThread(threading.Thread):
""" Sample thread class for gate demo """
def __init__(self, index, gate):
self.gate = gate
self.index = index
threading.Thread.__init__(self, None, 'Thread %d' % self.index, None)
def run(self):
# Sleep randomly
time.sleep(random.choice(range(1,10)))
# Mark entry to gate
enter(self.index)
self.gate.enter()
# Mark exit out of gate
exit(self.index)
def test1():
""" Test code where gate is closed successfully """
print 'test1()...'
gate = ThreadGate(10, 20)
random.seed()
print 'Starting threads...'
# Create 10 threads
threads = []
threads.append(OwnerThread(gate))
for x in range(10):
threads.append(SampleThread(x, gate))
# Start threads and join
for x in range(11):
threads[x].start()
# Join with threads
for x in range(11):
threads[x].join()
print 'Joined with threads'
print 'Gate count=>',gate.get_count()
# Exit and entry logs must be same
print enterlog
print exitlog
def test2():
""" Test code where gate is closed unsuccessfully """
print 'test1()...'
gate = ThreadGate(10, 5)
random.seed()
print 'Starting threads...'
# Create 10 threads
threads = []
threads.append(OwnerThread(gate))
for x in range(10):
threads.append(SampleThread(x, gate))
# Start threads and join
for x in range(11):
threads[x].start()
# Join with threads
for x in range(11):
threads[x].join()
print 'Joined with threads'
print 'Gate count=>',gate.get_count()
# Exit and entry logs must be same
print enterlog
print exitlog
test1()
while not enterlog.empty():
print enterlog.get(),
print
while not exitlog.empty():
print exitlog.get(),
print
test2()
while not enterlog.empty():
print enterlog.get(),
print
while not exitlog.empty():
print exitlog.get(),
print
if __name__ == "__main__":
test()
|
This construct is quite useful in suspending a thread or a threadpool at a specific position to preserve state for an operation which needs to make sure that the threads don't change state. Once the operation is complete, the threads are unblocked and resume the order in which they got blocked.
I have used semaphore and locks here - it is quite possible for an alternative implementation to use events.
Thread barriers are similar constructs, but they don't have the concept of an owner and are used to sync up threads at some point in code. Also, a barrier does not ensure that threads resume the order in which they joined. A barrier has a pre-defined size and unblocking is automatic after the last thread joins it.
It is interesting to observe that threading/synchronization constructs have interesting parallels with railroads. The concept of semaphores originate from real "semaphores" (vertical poles with metal flags) for restricting trains used in 19th century. It is amusing to see thread gates are similar to single vehicle manned level crossings!
I don't understand some things in revision 2 of this code:
position (I guess it should be renamed/retyped, as "is_open"/boolean for example) is never reset to 1 imho. Besides, there could be a race condition if position were reset to 1 in the open() at the location "position=0" is now (i'd say it should be before the sem.release)
race condition in enter: test for "position == 0" not atomic with all your synchros.
count is never decreased
If you protect the count incrementation with a mutex, you should also protect it in getcount(), it would be cleaner
I really really doubt count would be accurate as it is now, even if it were decreased, because it's based on a synchro completely isolated from the other sets of synchros. So I guess there would be transitory periods when it would report an inaccurate count
One thing about which I need to think more: I have the intuition that maybe 2 levels of lock (mutex / binary semaphore) is not needed here. But, well, I am not sure at all about that: for now I don't have any idea to suggest.
I thought a little about this syncho. Here are some additional comments on rev 2 of the code.
First of all, I have to remove the last statement in my previous comment: imho a 2-level synchro seems inevitable. Worse, I think that enforcing the Fifo ordering requires to have one lock for each thread waiting... this, if we consider we don't know anything about the scheduler (which is my case). If we can make any Fifo assumption about the scheduler, then, I still do think that a 2-level synchro is needed, as you implemented, but that there is no need to have one lock per queued thread (a simple condition+binary semaphore should be enough). So, please reconsider the last 2 sentences of my previous comment.
Second and more importantly, to me it appears the protocol for this gate synchro is not really rigourous as it is now. Waking up the threads in Fifo order does not tell anything about the order in which they will actually resume on the CPU, unless we use appropriate mutual exclusion when the threads start their execution: I didn't see any guarantee that the scheduler is Fifo. That's probably why you added the part you wanted "Fifo" inside the synchro itself (inside the enter(): you put your call to sleep() there, to show it's Fifo), but nothing guarantees anything after the enter(). So imho, you might see some cases where the order recorded in your exitlog is not the one effectively followed by the open() function.
That's why I think a synchro with an explicit enter/leave protocol would be more rigourous here: at least you can guarantee some exclusion inside the enter/leave which can enforce some Fifo ordering wrt the order in which the threads will be woken up by open().
This also means that, after the open, the threads will be serialized (mutual exclusion)... and raises the question "what to do if we have still threads in enter/leave, the gate is open, and new threads call enter...": do these new threads have to wait for the threads that are still in enter() ? Or can they execute right away, thus by-passing the Fifo order ?
Besides this, I still think that the implementation is broken as it is. For example calling open() doesn't re-open the gate: it just allows the blocked threads to be woken up. But any other threads coming after an open() will be queued as if the gate was still closed. This was not the case before close() was called the first time (in that case, the threads were not blocked at all).
Proposed implementation of the enter/leave protocol:
I didn't have enough space to add comments. To the question "what to do if we have still threads in enter/leave, the gate is open, and new threads call enter?"... this implementation answers "they execute right away". If you want to have them queued after those that are still exiting, you can uncomment the:
Following the answer to the previous question, it means that threads entering after the open() will be woken up in an arbitrary order. For that matter, the check in main() will have to be updated to take into account those threads that have been blocked by close() (hence the return value for enter()):
Please note that there is a race condition in this test: the assertion may fail, because enterlog() is filled outside from the lock.
Hi david,
I did not read your comments fully. But here is a version which I think fixes at least one of the problem you mentioned, namely that of managing the thread count. It also adds a size check to the gate which means that the owner can proceed from "close" only after either a fixed number of threads join the gate or a timeout occurs. The test cases illustrate both situations.
Please give your valuable comments.
Btw, I don't think this is any longer a FIFO, because of blocking semantics on the semaphore. Still I have retained the title as "Multithreaded FIFO gate".