Welcome, guest | Sign In | My Account | Store | Cart

This class is used to synchronize worker threads that get their input from a common source that changes over time, and may even be empty on some occasions.

The problem is that the threads are unaware of the existence of other threads, and have no way of knowing whether any new input will be inserted by other threads. Instead of using a separate 'control' thread, or having the threads exit needlessly each time the input source is empty, a WorkersLounge instance can be used to synchronize them.

The commonest example is using a shared Queue.Queue object, where each thread may put additional jobs into it depending on its current job. When the queue is empty, the other threads 'rest' in the 'lounge'. When the last thread with a job is trying to 'rest', all the threads exit. When a thread puts new jobs into the queue, it should wake up any resting thread by calling the 'back_to_work' method.

Python, 90 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import threading

class WorkersLounge(object):
	
	def __init__(self, total_workers_number):
		"""
		@param total_workers_number: the maximum number of worker threads
		"""
		self.total_workers_number = total_workers_number
		self.waiting_place = threading.Condition()
		self.work_done_event = threading.Event()
		
	def rest(self):
		"""
		When a thread calls this method there are two possible options:
		 - either there are other active threads, in which case the current thread waits
		 - all other threads are already waiting, in which case they all exit
		
		@return: True if the caller thread should go back to work
			 False if the caller thread should exit
		"""
		with self.waiting_place:
			if (len(self.waiting_place._Condition__waiters) ==
                            self.total_workers_number-1):
				# This is the last worker, and it has nothing to do
				self.work_done_event.set()				
				self.waiting_place.notifyAll()				
				# Notify the caller there is no more work to do
				return False
			else:
				# Wait for a signal
				self.waiting_place.wait()				
				return not(self.work_done_event.isSet())			
		
	def back_to_work(self):
		"""
		Wake up all the waiting threads.
		Should be called whenever a thread puts new input into the common source.
		"""
		# Wake up everybody
		with self.waiting_place:
			self.waiting_place.notifyAll()

if __name__ == "__main__":
	# Run test code
	import Queue
	import time
	
	print_lock = threading.Lock()
	def sync_print(text):
		with print_lock:
			print text
	
	def _thread_proc(input_queue, workers_lounge):
		thread_name = threading.currentThread().name
		while 1:
			try:
				# Attempt to get input from the common queue
				input = input_queue.get(timeout=0.1)
				# Do something with the input, possibly inserting more jobs
				# back into the queue
				sync_print("%s got input %s"%(thread_name, input))	
				time.sleep(1)
				if (input < 5):
					input_queue.put(input+1)
					input_queue.put(input+1)
					# Wake up any waiting thread
					workers_lounge.back_to_work()
			except Queue.Empty:
				# The 'rest' method returns False if the thread should stop,
				# and blocks until someone wakes it up
				sync_print("%s is resting"%thread_name)
				if (workers_lounge.rest() == False):
					sync_print("%s finished working"%thread_name)
					break

	# Create an initial input source
	input_queue = Queue.Queue()
	input_queue.put(1)
	input_queue.put(1)
	# Run worker threads
	threads_number = 5
	workers_lounge = WorkersLounge(total_workers_number=threads_number)
	for _i in range(threads_number):
		threads.append(
                        threading.Thread(target=_thread_proc, args=(input_queue, workers_lounge)))
	for thread in threads:
		thread.start()
	for thread in threads:
		thread.join()

Ideal for web crawling scenarios, where you start with a small set of 'seed' URLS, and each of them may lead to many new ones. Using the WorkersLounge, the threads will patiently wait for input, and exit when all possible links were explored (or a certain crawling depth was reached).

1 comment

Anoop Jacob Thomas 13 years, 7 months ago  # | flag

add a new line at 84 and push all the lines from 84 down.

and line 84 will be

threads = []