You can find examples on how to do threading, but they do not show off a thread pool. My goal was to get as small and simple as possible working thread pool example to show off the basic ideas without having extraneous things to understand. To show off the thread pool, I want stopping and starting of the threads to be explicit. This means the pool won't start until you are ready and will run forever until you are ready for it to stop. The main thread puts into the input queue and removes data from the output queue. The thread pool simply does the converse. Errors are also managed with another queue, so there is a clean distinction between errors and successful results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | import threading,Queue,time,sys,traceback
#Globals (start with a captial letter)
Qin = Queue.Queue()
Qout = Queue.Queue()
Qerr = Queue.Queue()
Pool = []
def err_msg():
trace= sys.exc_info()[2]
try:
exc_value=str(sys.exc_value)
except:
exc_value=''
return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value
def get_errors():
try:
while 1:
yield Qerr.get_nowait()
except Queue.Empty:
pass
def process_queue():
flag='ok'
while flag !='stop':
try:
flag,item=Qin.get() #will wait here!
if flag=='ok':
newdata='new'+item
Qout.put(newdata)
except:
Qerr.put(err_msg())
def start_threads(amount=5):
for i in range(amount):
thread = threading.Thread(target=process_queue)
thread.start()
Pool.append(thread)
def put(data,flag='ok'):
Qin.put([flag,data])
def get(): return Qout.get() #will wait here!
def get_all():
try:
while 1:
yield Qout.get_nowait()
except Queue.Empty:
pass
def stop_threads():
for i in range(len(Pool)):
Qin.put(('stop',None))
while Pool:
time.sleep(1)
for index,the_thread in enumerate(Pool):
if the_thread.isAlive():
continue
else:
del Pool[index]
break
#STANDARD use:
for i in ('b','c'): put(i)
start_threads()
stop_threads()
for i in get_all(): print i
for i in get_errors(): print i
#POOL use
#put element into input queue
put('a')
#setup threads -- will run forever as a pool until you shutdown
start_threads()
for i in ('b','c'): put(i)
#get an element from output queue
print get()
#put even more data in, 7 causes an error
for i in ('d','e',7): put(i)
#get whatever is available
for i in get_all(): print i
#stop_threads only returns when all threads have stopped
stop_threads()
print '__threads finished last data available__'
for i in get_all(): print i
for i in get_errors(): print i
#starting up threads again
start_threads()
put('f')
stop_threads()
print '__threads finished(again) last data available__'
for i in get_all(): print i
for i in get_errors(): print i
|
A fundamental issue with threading is how do you manage input and output when you have many threads working all at the same time. With python the most simple way to approach the problem is with the thread-safe Queue module. Even with the module, you have to worry about when to block and not block your access to the queue. You do not want to run forever nor do you want to end too early. The simple functions help manage that issue. For example, stop_threads waits until all the threads have stopped. However, get_all does not wait, it simply empties the output queue.
In the recipe, you define the work you want to do in process_queue. Each thread runs process_queue and will run forever until a stop flag is sent to it. You can stop and start the thread pool whenever you want.
You can reach me at pyguy2 on yahoo.
I like my version better :-).
Then you can use it like:
silly example, but you get the point :-)
good example but I wanted to show a simple thread pool. My example has 2 purposes. 1)to have as little knowledge overhead as possible -- so no use of classes 2)To show how to have a thread pool -- which means, I want the starting and stopping of the pool to be explicit
exercise left to the reader.... Great example, thanks!
My favourite implementation of Threader has optional data_queue and result_queue as constructor arguments, and numthreads as an argument to run().
Hi, I tried the code in winXP, it works. But looking at the task manager, python.exe has always had only one thread running even I ran the code above with default 5 threads. All results were calculated correctly. So is this expected behavior? How do I know for sure during the process, at most 5 threads were created and used?
Since Python 2.5 it is much easier to write a ThreadPool: http://code.activestate.com/recipes/577187-python-thread-pool/
Unnecessarily long code to explain ThreadPool. May be it was written for older version of Python.