You can find examples on how to do threading, but they do not show off a thread pool. My goal was to get as small and simple as possible working thread pool example to show off the basic ideas without having extraneous things to understand. To show off the thread pool, I want stopping and starting of the threads to be explicit. This means the pool won't start until you are ready and will run forever until you are ready for it to stop. The main thread puts into the input queue and removes data from the output queue. The thread pool simply does the converse. Errors are also managed with another queue, so there is a clean distinction between errors and successful results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
import threading,Queue,time,sys,traceback #Globals (start with a captial letter) Qin = Queue.Queue() Qout = Queue.Queue() Qerr = Queue.Queue() Pool =  def err_msg(): trace= sys.exc_info() try: exc_value=str(sys.exc_value) except: exc_value='' return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value def get_errors(): try: while 1: yield Qerr.get_nowait() except Queue.Empty: pass def process_queue(): flag='ok' while flag !='stop': try: flag,item=Qin.get() #will wait here! if flag=='ok': newdata='new'+item Qout.put(newdata) except: Qerr.put(err_msg()) def start_threads(amount=5): for i in range(amount): thread = threading.Thread(target=process_queue) thread.start() Pool.append(thread) def put(data,flag='ok'): Qin.put([flag,data]) def get(): return Qout.get() #will wait here! def get_all(): try: while 1: yield Qout.get_nowait() except Queue.Empty: pass def stop_threads(): for i in range(len(Pool)): Qin.put(('stop',None)) while Pool: time.sleep(1) for index,the_thread in enumerate(Pool): if the_thread.isAlive(): continue else: del Pool[index] break #STANDARD use: for i in ('b','c'): put(i) start_threads() stop_threads() for i in get_all(): print i for i in get_errors(): print i #POOL use #put element into input queue put('a') #setup threads -- will run forever as a pool until you shutdown start_threads() for i in ('b','c'): put(i) #get an element from output queue print get() #put even more data in, 7 causes an error for i in ('d','e',7): put(i) #get whatever is available for i in get_all(): print i #stop_threads only returns when all threads have stopped stop_threads() print '__threads finished last data available__' for i in get_all(): print i for i in get_errors(): print i #starting up threads again start_threads() put('f') stop_threads() print '__threads finished(again) last data available__' for i in get_all(): print i for i in get_errors(): print i
A fundamental issue with threading is how do you manage input and output when you have many threads working all at the same time. With python the most simple way to approach the problem is with the thread-safe Queue module. Even with the module, you have to worry about when to block and not block your access to the queue. You do not want to run forever nor do you want to end too early. The simple functions help manage that issue. For example, stop_threads waits until all the threads have stopped. However, get_all does not wait, it simply empties the output queue.
In the recipe, you define the work you want to do in process_queue. Each thread runs process_queue and will run forever until a stop flag is sent to it. You can stop and start the thread pool whenever you want.
You can reach me at pyguy2 on yahoo.