import threading class WorkersLounge(object): def __init__(self, total_workers_number): """ @param total_workers_number: the maximum number of worker threads """ self.total_workers_number = total_workers_number self.waiting_place = threading.Condition() self.work_done_event = threading.Event() def rest(self): """ When a thread calls this method there are two possible options: - either there are other active threads, in which case the current thread waits - all other threads are already waiting, in which case they all exit @return: True if the caller thread should go back to work False if the caller thread should exit """ with self.waiting_place: if (len(self.waiting_place._Condition__waiters) == self.total_workers_number-1): # This is the last worker, and it has nothing to do self.work_done_event.set() self.waiting_place.notifyAll() # Notify the caller there is no more work to do return False else: # Wait for a signal self.waiting_place.wait() return not(self.work_done_event.isSet()) def back_to_work(self): """ Wake up all the waiting threads. Should be called whenever a thread puts new input into the common source. """ # Wake up everybody with self.waiting_place: self.waiting_place.notifyAll() if __name__ == "__main__": # Run test code import Queue import time print_lock = threading.Lock() def sync_print(text): with print_lock: print text def _thread_proc(input_queue, workers_lounge): thread_name = threading.currentThread().name while 1: try: # Attempt to get input from the common queue input = input_queue.get(timeout=0.1) # Do something with the input, possibly inserting more jobs # back into the queue sync_print("%s got input %s"%(thread_name, input)) time.sleep(1) if (input < 5): input_queue.put(input+1) input_queue.put(input+1) # Wake up any waiting thread workers_lounge.back_to_work() except Queue.Empty: # The 'rest' method returns False if the thread should stop, # and blocks until someone wakes it up sync_print("%s is resting"%thread_name) if (workers_lounge.rest() == False): sync_print("%s finished working"%thread_name) break # Create an initial input source input_queue = Queue.Queue() input_queue.put(1) input_queue.put(1) # Run worker threads threads_number = 5 workers_lounge = WorkersLounge(total_workers_number=threads_number) for _i in range(threads_number): threads.append( threading.Thread(target=_thread_proc, args=(input_queue, workers_lounge))) for thread in threads: thread.start() for thread in threads: thread.join()