Welcome, guest | Sign In | My Account | Store | Cart

You can find examples on how to do threading, but they do not show off a thread pool. My goal was to get as small and simple as possible working thread pool example to show off the basic ideas without having extraneous things to understand. To show off the thread pool, I want stopping and starting of the threads to be explicit. This means the pool won't start until you are ready and will run forever until you are ready for it to stop. The main thread puts into the input queue and removes data from the output queue. The thread pool simply does the converse. Errors are also managed with another queue, so there is a clean distinction between errors and successful results.

Python, 97 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import threading,Queue,time,sys,traceback

#Globals (start with a captial letter)
Qin  = Queue.Queue() 
Qout = Queue.Queue()
Qerr = Queue.Queue()
Pool = []   

def err_msg():
    trace= sys.exc_info()[2]
    try:
        exc_value=str(sys.exc_value)
    except:
        exc_value=''
    return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value

def get_errors():
    try:
        while 1:
            yield Qerr.get_nowait()
    except Queue.Empty:
        pass

def process_queue():
    flag='ok'
    while flag !='stop':
        try:
            flag,item=Qin.get() #will wait here!
            if flag=='ok':
                newdata='new'+item
                Qout.put(newdata)
        except:
            Qerr.put(err_msg())
            
def start_threads(amount=5):
    for i in range(amount):
         thread = threading.Thread(target=process_queue)
         thread.start()
         Pool.append(thread)
def put(data,flag='ok'):
    Qin.put([flag,data]) 

def get(): return Qout.get() #will wait here!

def get_all():
    try:
        while 1:
            yield Qout.get_nowait()
    except Queue.Empty:
        pass
def stop_threads():
    for i in range(len(Pool)):
        Qin.put(('stop',None))
    while Pool:
        time.sleep(1)
        for index,the_thread in enumerate(Pool):
            if the_thread.isAlive():
                continue
            else:
                del Pool[index]
            break
#STANDARD use:
for i in ('b','c'): put(i)
start_threads()
stop_threads()
for i in get_all(): print i
for i in get_errors(): print i

#POOL use
#put element into input queue
put('a')

#setup threads -- will run forever as a pool until you shutdown
start_threads() 

for i in ('b','c'): put(i)

#get an element from output queue
print get() 

#put even more data in, 7 causes an error
for i in ('d','e',7): put(i)
#get whatever is available
for i in get_all(): print i

#stop_threads only returns when all threads have stopped
stop_threads()
print '__threads finished last data available__'
for i in get_all(): print i
for i in get_errors(): print i
#starting up threads again
start_threads()
put('f')
stop_threads()
print '__threads finished(again) last data available__'
for i in get_all(): print i
for i in get_errors(): print i

A fundamental issue with threading is how do you manage input and output when you have many threads working all at the same time. With python the most simple way to approach the problem is with the thread-safe Queue module. Even with the module, you have to worry about when to block and not block your access to the queue. You do not want to run forever nor do you want to end too early. The simple functions help manage that issue. For example, stop_threads waits until all the threads have stopped. However, get_all does not wait, it simply empties the output queue.

In the recipe, you define the work you want to do in process_queue. Each thread runs process_queue and will run forever until a stop flag is sent to it. You can stop and start the thread pool whenever you want.

You can reach me at pyguy2 on yahoo.

6 comments

Justin A 19 years, 7 months ago  # | flag

I like my version better :-).

import threading,Queue
import socket

import socket
#import time,random # temp

class Threader:
    def __init__(self, numthreads):
        self._numthreads=numthreads

    def get_data(self,):
        raise NotImplementedError, "You must implement get_data as a function that returns an iterable"
        return range(10000)
    def handle_data(self,data):
        raise NotImplementedError, "You must implement handle_data as a function that returns anything"
        time.sleep(random.randrange(1,5))
        return data*data
    def handle_result(self, data, result):
        raise NotImplementedError, "You must implement handle_result as a function that does anything"
        print data, result

    def _handle_data(self):
        while 1:
            x=self.Q.get()
            if x is None:
                break
            self.DQ.put((x,self.handle_data(x)))

    def _handle_result(self):
        while 1:
            x,xa=self.DQ.get()
            if x is None:
                break
            self.handle_result(x, xa)

    def run(self):
        if hasattr(self, "prerun"):
            self.prerun()
        self.Q=Queue.Queue()
        self.DQ=Queue.Queue()
        ts=[]
        for x in range(self._numthreads):
            t=threading.Thread(target=self._handle_data)
            t.start()
            ts.append(t)

        at=threading.Thread(target=self._handle_result)
        at.start()

        try :
            for x in self.get_data():
                self.Q.put(x)
        except NotImplementedError, e:
            print e
        for x in range(self._numthreads):
            self.Q.put(None)
        for t in ts:
            t.join()
        self.DQ.put((None,None))
        at.join()
        if hasattr(self, "postrun"):
            return self.postrun()
        return None

Then you can use it like:

from threader import Threader
import time

class ttest(Threader):
    def get_data(self):
        return range(100)

    def handle_data(self,data):
        return data*data

    def handle_result(self, data, result):
        self.res.append((data,result))
        #print "%d: %d" % (data, result)

    def prerun(self):
        self.res=[]
    def postrun(self):
        return self.res



a=ttest(10)
for n,ns in  a.run():
    print n,ns

silly example, but you get the point :-)

John Nielsen (author) 19 years, 6 months ago  # | flag

good example but I wanted to show a simple thread pool. My example has 2 purposes. 1)to have as little knowledge overhead as possible -- so no use of classes 2)To show how to have a thread pool -- which means, I want the starting and stopping of the pool to be explicit

Lloyd Girty 18 years ago  # | flag

exercise left to the reader.... Great example, thanks!

My favourite implementation of Threader has optional data_queue and result_queue as constructor arguments, and numthreads as an argument to run().

Larry Wang 15 years, 6 months ago  # | flag

Hi, I tried the code in winXP, it works. But looking at the task manager, python.exe has always had only one thread running even I ran the code above with default 5 threads. All results were calculated correctly. So is this expected behavior? How do I know for sure during the process, at most 5 threads were created and used?

Emilio Monti 13 years, 11 months ago  # | flag

Since Python 2.5 it is much easier to write a ThreadPool: http://code.activestate.com/recipes/577187-python-thread-pool/

Nilay 10 years, 8 months ago  # | flag

Unnecessarily long code to explain ThreadPool. May be it was written for older version of Python.