This class is used to synchronize worker threads that get their input from a common source that changes over time, and may even be empty on some occasions.
The problem is that the threads are unaware of the existence of other threads, and have no way of knowing whether any new input will be inserted by other threads. Instead of using a separate 'control' thread, or having the threads exit needlessly each time the input source is empty, a WorkersLounge instance can be used to synchronize them.
The commonest example is using a shared Queue.Queue object, where each thread may put additional jobs into it depending on its current job. When the queue is empty, the other threads 'rest' in the 'lounge'. When the last thread with a job is trying to 'rest', all the threads exit. When a thread puts new jobs into the queue, it should wake up any resting thread by calling the 'back_to_work' method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | import threading
class WorkersLounge(object):
def __init__(self, total_workers_number):
"""
@param total_workers_number: the maximum number of worker threads
"""
self.total_workers_number = total_workers_number
self.waiting_place = threading.Condition()
self.work_done_event = threading.Event()
def rest(self):
"""
When a thread calls this method there are two possible options:
- either there are other active threads, in which case the current thread waits
- all other threads are already waiting, in which case they all exit
@return: True if the caller thread should go back to work
False if the caller thread should exit
"""
with self.waiting_place:
if (len(self.waiting_place._Condition__waiters) ==
self.total_workers_number-1):
# This is the last worker, and it has nothing to do
self.work_done_event.set()
self.waiting_place.notifyAll()
# Notify the caller there is no more work to do
return False
else:
# Wait for a signal
self.waiting_place.wait()
return not(self.work_done_event.isSet())
def back_to_work(self):
"""
Wake up all the waiting threads.
Should be called whenever a thread puts new input into the common source.
"""
# Wake up everybody
with self.waiting_place:
self.waiting_place.notifyAll()
if __name__ == "__main__":
# Run test code
import Queue
import time
print_lock = threading.Lock()
def sync_print(text):
with print_lock:
print text
def _thread_proc(input_queue, workers_lounge):
thread_name = threading.currentThread().name
while 1:
try:
# Attempt to get input from the common queue
input = input_queue.get(timeout=0.1)
# Do something with the input, possibly inserting more jobs
# back into the queue
sync_print("%s got input %s"%(thread_name, input))
time.sleep(1)
if (input < 5):
input_queue.put(input+1)
input_queue.put(input+1)
# Wake up any waiting thread
workers_lounge.back_to_work()
except Queue.Empty:
# The 'rest' method returns False if the thread should stop,
# and blocks until someone wakes it up
sync_print("%s is resting"%thread_name)
if (workers_lounge.rest() == False):
sync_print("%s finished working"%thread_name)
break
# Create an initial input source
input_queue = Queue.Queue()
input_queue.put(1)
input_queue.put(1)
# Run worker threads
threads_number = 5
workers_lounge = WorkersLounge(total_workers_number=threads_number)
for _i in range(threads_number):
threads.append(
threading.Thread(target=_thread_proc, args=(input_queue, workers_lounge)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
|
Ideal for web crawling scenarios, where you start with a small set of 'seed' URLS, and each of them may lead to many new ones. Using the WorkersLounge, the threads will patiently wait for input, and exit when all possible links were explored (or a certain crawling depth was reached).
add a new line at 84 and push all the lines from 84 down.
and line 84 will be
threads = []