I am trying to show how to have a thread pool building on the recipe in http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/302746. This is a python class that essentially makes a thread pool for a function you define. Like the earlier example, I want to show off the power of having a thread pool that you can stop and start at will. Interestingly, you can mimic more standard thread use with the pool -- which I show off in as little as 3 lines of simple code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | import threading,Queue,time,sys,traceback
class easy_pool:
def __init__(self,func):
self.Qin = Queue.Queue()
self.Qout = Queue.Queue()
self.Qerr = Queue.Queue()
self.Pool = []
self.Func=func
def process_queue(self):
flag='ok'
while flag !='stop':
flag,item=self.Qin.get() #will wait here!
if flag=='ok':
try:
self.Qout.put(self.Func(item))
except:
self.Qerr.put(self.err_msg())
def start_threads(self,num_threads=5):
for i in range(num_threads):
thread = threading.Thread(target=self.process_queue)
thread.start()
self.Pool.append(thread)
def put(self,data,flag='ok'):
self.Qin.put([flag,data])
def get(self): return self.Qout.get() #will wait here!
def get_errors(self):
try:
while 1:
yield self.Qerr.get_nowait()
except Queue.Empty:
pass
def get_all(self):
try:
while 1:
yield self.Qout.get_nowait()
except Queue.Empty:
pass
def stop_threads(self):
for i in range(len(self.Pool)):
self.Qin.put(('stop',None))
while self.Pool:
time.sleep(0.1)
for index,the_thread in enumerate(self.Pool):
if the_thread.isAlive():
continue
else:
del self.Pool[index]
break
def run_all(self,asap=None,num_threads=10):
if asap:
self.start_threads(num_threads)
#do nothing until 1st one arrives
#assumes you'll get enough data for the threads not to hang
yield self.get()
while self.Qin.qsize():
for i in self.get_all():
yield i
time.sleep(60)
self.stop_threads()
for i in self.get_all():
yield i
else:
self.start_threads(num_threads)
self.stop_threads()
for i in self.get_all():
yield i
def err_msg(self):
trace= sys.exc_info()[2]
try:
exc_value=str(sys.exc_value)
except:
exc_value=''
return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value
def qinfo(self):
return 'in',self.Qin.qsize(),'out',self.Qout.qsize()
|
If you have a function that you want to thread: def work1(item): time.sleep(1) return 'hi '+item
You can use easy pool to simple thread something or more strictly as a pool.
STANDARD thread use:
With 3 lines of code you can run it and get results.
Basic way to do it
t=thp.easy_pool(work1) for i in ('a','b','c',1): t.put(i) for i in t.run_all(): print i for i in t.get_errors(): print 'error',i
The method run_all will by default wait until all threads have stopped before returning data. If you set asap to be true, run_all will not wait until everything is finished. It returns data as soon as possible by waiting until the first data arrives in the output queue and will not finish until all threads have stopped.
This turns on asap and changes the threads to 25
for i in t.run_all(asap=True,num_threads=25): print i
POOL USE
If you want to use it as a pool which will run until you explicity shutdown the #pool, you can do this:
t=thp.easy_pool(work1)
add to input queue
for i in ('d','e','f'): t.put(i)
start 8 threads
t.start_threads(8)
add more to input queue, 7 will make an error
for i in ('aa','bb','cc',7,'dd','ee','ff'): t.put(i)
wait here until a single result arrives
print '1st result',t.get()
get whatever data is available, not waiting
for i in t.get_all(): print i for i in t.get_errors(): print i
decide you've done enough work and shutdown the threads
t.stop_threads()
now threads have stopped, get remaining available data
for i in t.get_all(): print i for i in t.get_errors(): print i
You can reach me at pyguy2 on yahoo.com