Welcome, guest | Sign In | My Account | Store | Cart
import threading

class WorkersLounge(object):
	
	def __init__(self, total_workers_number):
		"""
		@param total_workers_number: the maximum number of worker threads
		"""
		self.total_workers_number = total_workers_number
		self.waiting_place = threading.Condition()
		self.work_done_event = threading.Event()
		
	def rest(self):
		"""
		When a thread calls this method there are two possible options:
		 - either there are other active threads, in which case the current thread waits
		 - all other threads are already waiting, in which case they all exit
		
		@return: True if the caller thread should go back to work
			 False if the caller thread should exit
		"""
		with self.waiting_place:
			if (len(self.waiting_place._Condition__waiters) ==
                            self.total_workers_number-1):
				# This is the last worker, and it has nothing to do
				self.work_done_event.set()				
				self.waiting_place.notifyAll()				
				# Notify the caller there is no more work to do
				return False
			else:
				# Wait for a signal
				self.waiting_place.wait()				
				return not(self.work_done_event.isSet())			
		
	def back_to_work(self):
		"""
		Wake up all the waiting threads.
		Should be called whenever a thread puts new input into the common source.
		"""
		# Wake up everybody
		with self.waiting_place:
			self.waiting_place.notifyAll()

if __name__ == "__main__":
	# Run test code
	import Queue
	import time
	
	print_lock = threading.Lock()
	def sync_print(text):
		with print_lock:
			print text
	
	def _thread_proc(input_queue, workers_lounge):
		thread_name = threading.currentThread().name
		while 1:
			try:
				# Attempt to get input from the common queue
				input = input_queue.get(timeout=0.1)
				# Do something with the input, possibly inserting more jobs
				# back into the queue
				sync_print("%s got input %s"%(thread_name, input))	
				time.sleep(1)
				if (input < 5):
					input_queue.put(input+1)
					input_queue.put(input+1)
					# Wake up any waiting thread
					workers_lounge.back_to_work()
			except Queue.Empty:
				# The 'rest' method returns False if the thread should stop,
				# and blocks until someone wakes it up
				sync_print("%s is resting"%thread_name)
				if (workers_lounge.rest() == False):
					sync_print("%s finished working"%thread_name)
					break

	# Create an initial input source
	input_queue = Queue.Queue()
	input_queue.put(1)
	input_queue.put(1)
	# Run worker threads
	threads_number = 5
	workers_lounge = WorkersLounge(total_workers_number=threads_number)
	for _i in range(threads_number):
		threads.append(
                        threading.Thread(target=_thread_proc, args=(input_queue, workers_lounge)))
	for thread in threads:
		thread.start()
	for thread in threads:
		thread.join()

History