import threading,Queue,time,sys,traceback class easy_pool: def __init__(self,func): self.Qin = Queue.Queue() self.Qout = Queue.Queue() self.Qerr = Queue.Queue() self.Pool = [] self.Func=func def process_queue(self): flag='ok' while flag !='stop': flag,item=self.Qin.get() #will wait here! if flag=='ok': try: self.Qout.put(self.Func(item)) except: self.Qerr.put(self.err_msg()) def start_threads(self,num_threads=5): for i in range(num_threads): thread = threading.Thread(target=self.process_queue) thread.start() self.Pool.append(thread) def put(self,data,flag='ok'): self.Qin.put([flag,data]) def get(self): return self.Qout.get() #will wait here! def get_errors(self): try: while 1: yield self.Qerr.get_nowait() except Queue.Empty: pass def get_all(self): try: while 1: yield self.Qout.get_nowait() except Queue.Empty: pass def stop_threads(self): for i in range(len(self.Pool)): self.Qin.put(('stop',None)) while self.Pool: time.sleep(0.1) for index,the_thread in enumerate(self.Pool): if the_thread.isAlive(): continue else: del self.Pool[index] break def run_all(self,asap=None,num_threads=10): if asap: self.start_threads(num_threads) #do nothing until 1st one arrives #assumes you'll get enough data for the threads not to hang yield self.get() while self.Qin.qsize(): for i in self.get_all(): yield i time.sleep(60) self.stop_threads() for i in self.get_all(): yield i else: self.start_threads(num_threads) self.stop_threads() for i in self.get_all(): yield i def err_msg(self): trace= sys.exc_info()[2] try: exc_value=str(sys.exc_value) except: exc_value='' return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value def qinfo(self): return 'in',self.Qin.qsize(),'out',self.Qout.qsize()