Welcome, guest | Sign In | My Account | Store | Cart
import threading,Queue,time,sys,traceback

class easy_pool:
    def __init__(self,func):
        self.Qin  = Queue.Queue() 
        self.Qout = Queue.Queue()
        self.Qerr = Queue.Queue()
        self.Pool = []   
        self.Func=func
    def process_queue(self):
        flag='ok'
        while flag !='stop':
            flag,item=self.Qin.get() #will wait here!
            if flag=='ok':
                try:
                    self.Qout.put(self.Func(item))
                except:
                    self.Qerr.put(self.err_msg())
    def start_threads(self,num_threads=5):
        for i in range(num_threads):
             thread = threading.Thread(target=self.process_queue)
             thread.start()
             self.Pool.append(thread)
    def put(self,data,flag='ok'):
        self.Qin.put([flag,data]) 

    def get(self): return self.Qout.get() #will wait here!

    def get_errors(self):
        try:
            while 1:
                yield self.Qerr.get_nowait()
        except Queue.Empty:
            pass
    
    def get_all(self):
        try:
            while 1:
                yield self.Qout.get_nowait()
        except Queue.Empty:
            pass
        
    def stop_threads(self):
        for i in range(len(self.Pool)):
            self.Qin.put(('stop',None))
        while self.Pool:
            time.sleep(0.1)
            for index,the_thread in enumerate(self.Pool):
                if the_thread.isAlive():
                    continue
                else:
                    del self.Pool[index]
                break
    def run_all(self,asap=None,num_threads=10):
        if asap:
            self.start_threads(num_threads)
            #do nothing until 1st one arrives
            #assumes you'll get enough data for the threads not to hang
            yield self.get()
            
            while self.Qin.qsize():
                for i in self.get_all():
                    yield i
                    time.sleep(60)
            self.stop_threads()
            for i in self.get_all():
                yield i            
        else:            
            self.start_threads(num_threads)
            self.stop_threads()
            for i in self.get_all():
                yield i
    def err_msg(self):
        trace= sys.exc_info()[2]
        try:
            exc_value=str(sys.exc_value)
        except:
            exc_value=''
        return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value
    def qinfo(self):
        return 'in',self.Qin.qsize(),'out',self.Qout.qsize()

History

  • revision 3 (17 years ago)
  • previous revisions are not available