Welcome, guest | Sign In | My Account | Store | Cart

While programming with multiple threads, sometimes one needs a construct which allows to suspend the execution of a set of running threads. This is normally required by an outside thread which wants to suspend the running threads for performing a specific action. The threads need to resume after the action in the same order in which they got suspended.

A thread gate (or gateway) allows you to do this. It acts like a gate through which only one thread can pass at a time. By default the gate is open, allowing all threads to "enter" the gate. When a thread calls "close", the gate is closed, blocking any threads which make a further call to "enter", till the gate is re-opened by the owner, whence the threads resume the order in which they got blocked.

The real-life parallel for this is a human operated level cross, which allows only one vehicle to pass at a time.

Python, 291 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import threading
import time

class ThreadGateException(Exception):
    pass

class ThreadGate(object):
    """ A class which works as a FIFO 'gate' for threads. By
    default the gate is open and any thread calling on the
    'enter' method returns immediately.

    A thread can 'close' the gate by calling the method 'close'.
    This thread becomes the owner of the gate. Any other thread
    calling 'enter' after this is automatically blocked till
    the owner calls reopens the gate by calling 'open'.

    The gate requires a certain number of threads to block
    before the owner exits from the 'close' method. Otherwise,
    the owner waits for a timeout before returning from the 'close'
    method, without actually closing the gate.
    
    The gate class can be used to block running threads for a 
    particular operation and making sure that they resume after
    the operation is complete, for a fixed number of threads.
    """

    def __init__(self, numthreads, timeout=0):
        self.lock = threading.Lock()
        self.sem = threading.BoundedSemaphore(1)
        self.evt = threading.Event()
        self.count = 0
        self.owner_timeout = timeout
        self.owner = None
        self.nthreads = numthreads
        # Open by default
        self.position = 1

    def close(self):
        """ Close the gate. The calling thread
        becomes the owner of the gate and blocks
        till the requisite number of threads block
        on the gate or a timeout occurs, whichever
        is first.

        It is an error if the gate is already closed """

        if self.position == 0:
            # Already closed
            raise ThreadGateException,"trying to close an already closed gate"

        try:
            self.lock.acquire()
            self.position = 0
            self.owner = threading.currentThread()
            self.sem.acquire()
        finally:
            self.lock.release()

        # Wait on the event till timeout
        self.evt.clear()
        self.evt.wait(self.owner_timeout)
        
        # If event was set, requisite number off
        # threads have blocked, else reset the gate
        if not self.evt.isSet():
            try:
                print 'Owner thread timedout, re-setting gate'
                self.lock.acquire()
                self.position = 1
                self.owner = None
                self.sem.release()
            finally:
                self.lock.release()

            return -1
            
        return 0

        
    def open(self):
        """ Open the gate. The calling thread should
        be the owner of the gate. It is an error if
        the gate is already open """

        if self.position == 1:
            # Already open
            raise ThreadGateException,"trying to open an already opened gate"

        if threading.currentThread() != self.owner:
            raise ThreadGateException,"not owner, cannot open gate"
            
        try:
            self.lock.acquire()
            self.position = 1
            self.owner = None
            self.sem.release()
        finally:
            self.lock.release()
            
    def enter(self):
        """ Enter the gate. If the gate is open, returns
        immediately, else gets blocked till the gate is
        opened by the owner """

        if self.position==1:
            return 0

        # Lock mutex and increment count
        try:
            self.lock.acquire()
            self.count += 1
            if self.count==self.nthreads:
                self.evt.set()
        finally:
            self.lock.release()

        ct = threading.currentThread()
        print 'Thread %s - Entering Gate' % (ct.getName())

        # Lock mutex and decrement count
        try:
            # Will block here
            self.sem.acquire()
            self.lock.acquire()
            self.count -= 1
        finally:
            self.lock.release()
            self.sem.release()
            
        print 'Thread %s - Exiting Gate' % (ct.getName())
        
    def get_count(self):
        """ Return count of blocked threads """

        return self.count


def test():
    """ Test code """

    import random
    import Queue

    enterlog = Queue.Queue(0)
    exitlog = Queue.Queue(0)

    def enter(index):
        enterlog.put(index)

    def exit(index):
        exitlog.put(index)


    class OwnerThread(threading.Thread):
        """ Owner thread class for gate demo """

        def __init__(self, gate):
            self.gate = gate
            threading.Thread.__init__(self, None, 'Owner thread')
            
        def run(self):
            # Close the gate
            print 'Closing gate...'
            ret = self.gate.close()
            if ret==0:
                print 'Gate closed successfully'

            print 'Gate count=>',self.gate.get_count()
                
            # Open gate after sleeping some time
            time.sleep(5)
            if ret==0:
                print 'Opening gate'
                self.gate.open()
            else:
                print 'Gate closing not successful'

            
    class SampleThread(threading.Thread):
        """ Sample thread class for gate demo """
        
        def __init__(self, index, gate):
            self.gate = gate
            self.index = index
            threading.Thread.__init__(self, None, 'Thread %d' % self.index, None) 
                                      
        def run(self):

            # Sleep randomly
            time.sleep(random.choice(range(1,10)))
            # Mark entry to gate
            enter(self.index)
            self.gate.enter()
            # Mark exit out of gate
            exit(self.index)


    def test1():
        """ Test code where gate is closed successfully """

        print 'test1()...'

        gate = ThreadGate(10, 20)
        
        random.seed()
        
        print 'Starting threads...'
        # Create 10 threads
        threads = []
        threads.append(OwnerThread(gate))
        
        for x in range(10):
            threads.append(SampleThread(x, gate))
            
        # Start threads and join
        for x in range(11):
            threads[x].start()        
            

        # Join with threads
        for x in range(11):
            threads[x].join()

        print 'Joined with threads'
        print 'Gate count=>',gate.get_count()
            
        # Exit and entry logs must be same
        print enterlog
        print exitlog

    def test2():
        """ Test code where gate is closed unsuccessfully """

        print 'test1()...'

        gate = ThreadGate(10, 5)
        
        random.seed()
        
        print 'Starting threads...'
        # Create 10 threads
        threads = []
        threads.append(OwnerThread(gate))
        
        for x in range(10):
            threads.append(SampleThread(x, gate))
            
        # Start threads and join
        for x in range(11):
            threads[x].start()        
            

        # Join with threads
        for x in range(11):
            threads[x].join()

        print 'Joined with threads'

        print 'Gate count=>',gate.get_count()
        
        # Exit and entry logs must be same
        print enterlog
        print exitlog        

    test1()

    while not enterlog.empty():
        print enterlog.get(),

    print

    while not exitlog.empty():
        print exitlog.get(),
        
    print
    
    test2()

    while not enterlog.empty():
        print enterlog.get(),

    print

    while not exitlog.empty():
        print exitlog.get(),
        
    print
    
    
if __name__ == "__main__":
    test()
            

This construct is quite useful in suspending a thread or a threadpool at a specific position to preserve state for an operation which needs to make sure that the threads don't change state. Once the operation is complete, the threads are unblocked and resume the order in which they got blocked.

I have used semaphore and locks here - it is quite possible for an alternative implementation to use events.

Thread barriers are similar constructs, but they don't have the concept of an owner and are used to sync up threads at some point in code. Also, a barrier does not ensure that threads resume the order in which they joined. A barrier has a pre-defined size and unblocking is automatic after the last thread joins it.

It is interesting to observe that threading/synchronization constructs have interesting parallels with railroads. The concept of semaphores originate from real "semaphores" (vertical poles with metal flags) for restricting trains used in 19th century. It is amusing to see thread gates are similar to single vehicle manned level crossings!

6 comments

david decotigny 13 years, 3 months ago  # | flag

I don't understand some things in revision 2 of this code:

  • position (I guess it should be renamed/retyped, as "is_open"/boolean for example) is never reset to 1 imho. Besides, there could be a race condition if position were reset to 1 in the open() at the location "position=0" is now (i'd say it should be before the sem.release)

  • race condition in enter: test for "position == 0" not atomic with all your synchros.

  • count is never decreased

  • If you protect the count incrementation with a mutex, you should also protect it in getcount(), it would be cleaner

  • I really really doubt count would be accurate as it is now, even if it were decreased, because it's based on a synchro completely isolated from the other sets of synchros. So I guess there would be transitory periods when it would report an inaccurate count

One thing about which I need to think more: I have the intuition that maybe 2 levels of lock (mutex / binary semaphore) is not needed here. But, well, I am not sure at all about that: for now I don't have any idea to suggest.

david decotigny 13 years, 3 months ago  # | flag

I thought a little about this syncho. Here are some additional comments on rev 2 of the code.

First of all, I have to remove the last statement in my previous comment: imho a 2-level synchro seems inevitable. Worse, I think that enforcing the Fifo ordering requires to have one lock for each thread waiting... this, if we consider we don't know anything about the scheduler (which is my case). If we can make any Fifo assumption about the scheduler, then, I still do think that a 2-level synchro is needed, as you implemented, but that there is no need to have one lock per queued thread (a simple condition+binary semaphore should be enough). So, please reconsider the last 2 sentences of my previous comment.

Second and more importantly, to me it appears the protocol for this gate synchro is not really rigourous as it is now. Waking up the threads in Fifo order does not tell anything about the order in which they will actually resume on the CPU, unless we use appropriate mutual exclusion when the threads start their execution: I didn't see any guarantee that the scheduler is Fifo. That's probably why you added the part you wanted "Fifo" inside the synchro itself (inside the enter(): you put your call to sleep() there, to show it's Fifo), but nothing guarantees anything after the enter(). So imho, you might see some cases where the order recorded in your exitlog is not the one effectively followed by the open() function.

That's why I think a synchro with an explicit enter/leave protocol would be more rigourous here: at least you can guarantee some exclusion inside the enter/leave which can enforce some Fifo ordering wrt the order in which the threads will be woken up by open().

This also means that, after the open, the threads will be serialized (mutual exclusion)... and raises the question "what to do if we have still threads in enter/leave, the gate is open, and new threads call enter...": do these new threads have to wait for the threads that are still in enter() ? Or can they execute right away, thus by-passing the Fifo order ?

Besides this, I still think that the implementation is broken as it is. For example calling open() doesn't re-open the gate: it just allows the blocked threads to be woken up. But any other threads coming after an open() will be queued as if the gate was still closed. This was not the case before close() was called the first time (in that case, the threads were not blocked at all).

david decotigny 13 years, 3 months ago  # | flag

Proposed implementation of the enter/leave protocol:

from collections import deque
...
class ThreadGate(object):
    """The unprotected_* methods are expected to be called with lock held. Use the non-unprotected_* variant otherwise"""

    def __init__(self, lock = None):
        if lock is None: lock = threading.Lock()
        self.lock       = lock
        self.owner      = None # threading ID
        self.waiters    = deque() # list of locks
        self.per_thread = threading.local()
        self.open       = self._synchronized(self.unprotected_open)
        self.close      = self._synchronized(self.unprotected_close)
        self.leave      = self._synchronized(self.unprotected_leave)
        self.count      = self._synchronized(self.unprotected_count)
        self.is_open    = self._synchronized(self.unprotected_is_open)

    def _synchronized(self, f):
        def new_f(*args, **kw):
            self.lock.acquire()
            try:
                return f(*args, **kw)
            finally:
                self.lock.release()
        return new_f

    def unprotected_close(self):
        if self.owner is not None:
            raise ThreadGateException,"trying to close an already closed gate"
        self.owner = threading.currentThread()

    def unprotected_open(self):
        if self.owner is None:
            raise ThreadGateException,"trying to open an already opened gate"
        if threading.currentThread() != self.owner:
            raise ThreadGateException,"not owner, cannot open gate"
        self.owner = None
        self.__wakeup_head()

    def enter(self):
        """Lock expected NOT to be acquired, return True if was blocked"""
        self.lock.acquire()
        try:
            # The gate is not closed: don't block only if no others are blocked
            self.per_thread.is_blocked = (self.owner is not None) # and len(self.waiters) == 0
            if not self.per_thread.is_blocked: return False

            # Create our lock
            waiter = threading.Lock()
            waiter.acquire()
            self.waiters.append(waiter)

        finally:
            self.lock.release()

        # Go to sleep, waiting for some thread to wake-us up
        waiter.acquire()
        return True # Was blocked

    def unprotected_leave(self):
        if self.per_thread.is_blocked: self.__wakeup_head()

    def __wakeup_head(self):
        """Lock expected to be held by caller"""
        if not self.waiters: return # Nobody to wake up
        self.waiters.popleft().release()

    def unprotected_count(self):
        return len(self.waiters)

    def unprotected_is_open(self):
        return (self.owner is None)
david decotigny 13 years, 3 months ago  # | flag

I didn't have enough space to add comments. To the question "what to do if we have still threads in enter/leave, the gate is open, and new threads call enter?"... this implementation answers "they execute right away". If you want to have them queued after those that are still exiting, you can uncomment the:

and len(self.waiters) == 0

Following the answer to the previous question, it means that threads entering after the open() will be woken up in an arbitrary order. For that matter, the check in main() will have to be updated to take into account those threads that have been blocked by close() (hence the return value for enter()):

def test():
    """ Test code """
    NTHREADS = 255

    import random

    print 'test()...'

    enterlog = []
    exitlog = []
    gate = ThreadGate()

    random.seed()

    class SampleThread(threading.Thread):
        """ Sample thread class for gate demo """

        def __init__(self, index):
            self.index = index
            threading.Thread.__init__(self, name = 'Thread %d' % self.index)

        def run(self):

            # Sleep randomly
            time.sleep(random.choice(range(1,10)))
            # Mark entry to gate
            ct = threading.currentThread()

            print 'Thread %s - Entering Gate...' % ct.getName()
            if not gate.is_open():
                # Might catch more threads than those really blocked
                enterlog.append(self.index)            
            was_blocked = gate.enter()

            time.sleep(.1)

            # Mark exit out of gate
            print 'Thread %s - Exiting Gate (was blocked: %s)' \
                  % (ct.getName(), was_blocked)
            if was_blocked:
                exitlog.append(self.index)

            gate.leave()

    # Close the gate
    print 'Closing gate...'
    gate.close()

    print 'Starting threads...'
    # Create 10 threads
    threads = []
    for x in range(NTHREADS):
        threads.append(SampleThread(x))

    # Start threads and join
    for x in range(NTHREADS):
        threads[x].start()        

    # Open gate after sleeping some time
    time.sleep(5)
    print 'Opening gate'
    gate.open()

    # Join with threads
    for x in range(NTHREADS):
        threads[x].join()

    print 'Joined with threads'

    # Exit and entry logs must be same
    print "Entered and probably slept:", enterlog
    print "Exited after having slept:", exitlog
    assert([e for e in enterlog if e in exitlog] == exitlog)

Please note that there is a race condition in this test: the assertion may fail, because enterlog() is filled outside from the lock.

Anand (author) 13 years, 3 months ago  # | flag

Hi david,

I did not read your comments fully. But here is a version which I think fixes at least one of the problem you mentioned, namely that of managing the thread count. It also adds a size check to the gate which means that the owner can proceed from "close" only after either a fixed number of threads join the gate or a timeout occurs. The test cases illustrate both situations.

Please give your valuable comments.

Anand (author) 13 years, 3 months ago  # | flag

Btw, I don't think this is any longer a FIFO, because of blocking semantics on the semaphore. Still I have retained the title as "Multithreaded FIFO gate".