Welcome, guest | Sign In | My Account | Store | Cart

I am trying to show how to have a thread pool building on the recipe in http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/302746. This is a python class that essentially makes a thread pool for a function you define. Like the earlier example, I want to show off the power of having a thread pool that you can stop and start at will. Interestingly, you can mimic more standard thread use with the pool -- which I show off in as little as 3 lines of simple code.

Python, 81 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import threading,Queue,time,sys,traceback

class easy_pool:
    def __init__(self,func):
        self.Qin  = Queue.Queue() 
        self.Qout = Queue.Queue()
        self.Qerr = Queue.Queue()
        self.Pool = []   
        self.Func=func
    def process_queue(self):
        flag='ok'
        while flag !='stop':
            flag,item=self.Qin.get() #will wait here!
            if flag=='ok':
                try:
                    self.Qout.put(self.Func(item))
                except:
                    self.Qerr.put(self.err_msg())
    def start_threads(self,num_threads=5):
        for i in range(num_threads):
             thread = threading.Thread(target=self.process_queue)
             thread.start()
             self.Pool.append(thread)
    def put(self,data,flag='ok'):
        self.Qin.put([flag,data]) 

    def get(self): return self.Qout.get() #will wait here!

    def get_errors(self):
        try:
            while 1:
                yield self.Qerr.get_nowait()
        except Queue.Empty:
            pass
    
    def get_all(self):
        try:
            while 1:
                yield self.Qout.get_nowait()
        except Queue.Empty:
            pass
        
    def stop_threads(self):
        for i in range(len(self.Pool)):
            self.Qin.put(('stop',None))
        while self.Pool:
            time.sleep(0.1)
            for index,the_thread in enumerate(self.Pool):
                if the_thread.isAlive():
                    continue
                else:
                    del self.Pool[index]
                break
    def run_all(self,asap=None,num_threads=10):
        if asap:
            self.start_threads(num_threads)
            #do nothing until 1st one arrives
            #assumes you'll get enough data for the threads not to hang
            yield self.get()
            
            while self.Qin.qsize():
                for i in self.get_all():
                    yield i
                    time.sleep(60)
            self.stop_threads()
            for i in self.get_all():
                yield i            
        else:            
            self.start_threads(num_threads)
            self.stop_threads()
            for i in self.get_all():
                yield i
    def err_msg(self):
        trace= sys.exc_info()[2]
        try:
            exc_value=str(sys.exc_value)
        except:
            exc_value=''
        return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value
    def qinfo(self):
        return 'in',self.Qin.qsize(),'out',self.Qout.qsize()

If you have a function that you want to thread: def work1(item): time.sleep(1) return 'hi '+item

You can use easy pool to simple thread something or more strictly as a pool.

STANDARD thread use:

With 3 lines of code you can run it and get results.

Basic way to do it

t=thp.easy_pool(work1) for i in ('a','b','c',1): t.put(i) for i in t.run_all(): print i for i in t.get_errors(): print 'error',i

The method run_all will by default wait until all threads have stopped before returning data. If you set asap to be true, run_all will not wait until everything is finished. It returns data as soon as possible by waiting until the first data arrives in the output queue and will not finish until all threads have stopped.

This turns on asap and changes the threads to 25

for i in t.run_all(asap=True,num_threads=25): print i

POOL USE

If you want to use it as a pool which will run until you explicity shutdown the #pool, you can do this:

t=thp.easy_pool(work1)

add to input queue

for i in ('d','e','f'): t.put(i)

start 8 threads

t.start_threads(8)

add more to input queue, 7 will make an error

for i in ('aa','bb','cc',7,'dd','ee','ff'): t.put(i)

wait here until a single result arrives

print '1st result',t.get()

get whatever data is available, not waiting

for i in t.get_all(): print i for i in t.get_errors(): print i

decide you've done enough work and shutdown the threads

t.stop_threads()

now threads have stopped, get remaining available data

for i in t.get_all(): print i for i in t.get_errors(): print i

You can reach me at pyguy2 on yahoo.com