Welcome, guest | Sign In | My Account | Store | Cart

A thread pool class that takes arbitrary callables as work units, and supports callbacks when the work unit is complete.

Python, 214 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import threading
from time import sleep

# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
    True
except NameError:
    False = 0
    True = not False

class ThreadPool:

    """Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread."""
    
    def __init__(self, numThreads):

        """Initialize the thread pool with numThreads workers."""
        
        self.__threads = []
        self.__resizeLock = threading.Condition(threading.Lock())
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.setThreadCount(numThreads)

    def setThreadCount(self, newNumThreads):

        """ External method to set the current pool size.  Acquires
        the resizing lock, then calls the internal version to do real
        work."""
        
        # Can't change the thread count if we're shutting down the pool!
        if self.__isJoining:
            return False
        
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(newNumThreads)
        finally:
            self.__resizeLock.release()
        return True

    def __setThreadCountNolock(self, newNumThreads):
        
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        
        # If we need to grow the pool, do so
        while newNumThreads > len(self.__threads):
            newThread = ThreadPoolThread(self)
            self.__threads.append(newThread)
            newThread.start()
        # If we need to shrink the pool, do so
        while newNumThreads < len(self.__threads):
            self.__threads[0].goAway()
            del self.__threads[0]

    def getThreadCount(self):

        """Return the number of threads in the pool."""
        
        self.__resizeLock.acquire()
        try:
            return len(self.__threads)
        finally:
            self.__resizeLock.release()

    def queueTask(self, task, args=None, taskCallback=None):

        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""
        
        if self.__isJoining == True:
            return False
        if not callable(task):
            return False
        
        self.__taskLock.acquire()
        try:
            self.__tasks.append((task, args, taskCallback))
            return True
        finally:
            self.__taskLock.release()

    def getNextTask(self):

        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolThread objects contained in the pool."""
        
        self.__taskLock.acquire()
        try:
            if self.__tasks == []:
                return (None, None, None)
            else:
                return self.__tasks.pop(0)
        finally:
            self.__taskLock.release()
    
    def joinAll(self, waitForTasks = True, waitForThreads = True):

        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""
        
        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True

        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(.1)

        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(0)
            self.__isJoining = True

            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.join()
                    del t

            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()


        
class ThreadPoolThread(threading.Thread):

    """ Pooled thread class. """
    
    threadSleepTime = 0.1

    def __init__(self, pool):

        """ Initialize the thread and remember the pool. """
        
        threading.Thread.__init__(self)
        self.__pool = pool
        self.__isDying = False
        
    def run(self):

        """ Until told to quit, retrieve the next task and execute
        it, calling the callback if any.  """
        
        while self.__isDying == False:
            cmd, args, callback = self.__pool.getNextTask()
            # If there's nothing to do, just sleep a bit
            if cmd is None:
                sleep(ThreadPoolThread.threadSleepTime)
            elif callback is None:
                cmd(args)
            else:
                callback(cmd(args))
    
    def goAway(self):

        """ Exit the run loop next time through."""
        
        self.__isDying = True

# Usage example
if __name__ == "__main__":

    from random import randrange

    # Sample task 1: given a start and end value, shuffle integers,
    # then sort them
    
    def sortTask(data):
        print "SortTask starting for ", data
        numbers = range(data[0], data[1])
        for a in numbers:
            rnd = randrange(0, len(numbers) - 1)
            a, numbers[rnd] = numbers[rnd], a
        print "SortTask sorting for ", data
        numbers.sort()
        print "SortTask done for ", data
        return "Sorter ", data

    # Sample task 2: just sleep for a number of seconds.

    def waitTask(data):
        print "WaitTask starting for ", data
        print "WaitTask sleeping for %d seconds" % data
        sleep(data)
        return "Waiter", data

    # Both tasks use the same callback

    def taskCallback(data):
        print "Callback called for", data

    # Create a pool with three worker threads

    pool = ThreadPool(3)

    # Insert tasks into the queue and let them run
    pool.queueTask(sortTask, (1000, 100000), taskCallback)
    pool.queueTask(waitTask, 5, taskCallback)
    pool.queueTask(sortTask, (200, 200000), taskCallback)
    pool.queueTask(waitTask, 2, taskCallback)
    pool.queueTask(sortTask, (3, 30000), taskCallback)
    pool.queueTask(waitTask, 7, taskCallback)

    # When all tasks are finished, allow the threads to terminate
    pool.joinAll()

A colleague showed me a very nice thread pool class he wrote in C++, using generic programming (templates). The code was so neat and concise, it almost resembled Python, so naturally I took the next step: actually implementing it in Python.

A thread pool is a construct used to simply the use of threads and decrease the overhead of starting up worker threads for short-lived tasks. When the pool is created, it creates a number of worker threads, each of which waits for work to do. When the pool is given work, it dispatches the work to one of the waiting threads, which does the work, returns the result, and then goes back to waiting for more work. The pool can be dynamically resized (for example, based on feedback like the number of threads currently idle, or the number of unexecuted tasks in the queue).

Tasks are submitted to the pool with the queueTask() method. This method takes any callable as the task, an optional data argument that is passed to the task, and an optional callback that can be called when the task has completed. The data argument is a convenience feature; it could be eliminated by currying the data into the task.

To shut down the thread pool, use the joinAll() method. joinAll() takes two optional arguments: waitForTasks and waitForThreads. If waitForTasks is true, joinAll will block until any currently-queued tasks have been completed. If waitForThreads is true, joinAll will wait until all threads in the pool have exited before returning. Both parameters default to true.

A thread pool in Python is somewhat less useful than it is in other languages, because the individual threads (for the most part) cannot run at the same time on multi-processor machines, because Python has a single Global Interpreter Lock (GIL) that must be held to execute any Python code. However, it can still be a good tool for better organizing threaded tasks, and minimizing the overhead of thread creation and destruction.

17 comments

Anand 20 years, 9 months ago  # | flag

Nice. I like this implementation. It is compact, and

well written. But do you really need the resize

lock, since the thread pool will mostly be initialized

with a size ?

Anand Pillai

Tim Lesher (author) 20 years, 9 months ago  # | flag

Correct... You're correct--the resize lock isn't necessary if you don't use dynamic resizing of the pool. In that case, the setThreadCount() method could be removed as well.

It's not that often that you really need a dynamically-sized thread pool, but in some narrow cases (long-running servers on loaded machines, with very bursty workloads), it can make sense.

William McLendon 20 years, 8 months ago  # | flag

True / False. If you copy and run the code as written, python will error out with a traceback b/c 'True' and 'False' aren't defined (at least in python 2.2). Defining True and False fixed that and it works great :D

Tim Lesher (author) 20 years, 8 months ago  # | flag

Yes... True and False were added to __builtins__ in 2.2.1. I should either eliminate them or add code to cope with earlier versions.

Tim Lesher (author) 20 years, 8 months ago  # | flag

Done. I've added checks for True and False; this should now work on Python 2.2.

Kevin Saunders 20 years, 7 months ago  # | flag

Why busy wait? Using a Condition Object's wait() and notify() you could make ThreadPoolThread::run() block until there was work to do, rather than waking up every tenth of a second to probe the pool.

This will be more efficient than busy waiting, unless there is almost always something on the task queue.

Mike Barrett 20 years, 7 months ago  # | flag

Great Idea. This is a great idea. Thank you for providing it. I'm currently using this code in a log rotation script that I've written.

I've noticed, however, that the test you do at the beginning doesn't work in Python 2.3 (maybe others) if the code is imported to be used by other code. I replaced it with the following, which seems to work ok:

try:
    x = True
except NameError:
    False = 0
    True = not False
Luis Cortes 20 years, 6 months ago  # | flag

You JoinAll doesn't work. Notice that you zero out your threads list before you wait on your join. You zero it out at __setThreadCountNoLock(0). Therefore by the time you get to your loop there is NEVER anything to loop on.

I've mucked around with it. What I think you might need is, a WaitForEveryoneDone routine. Something that doesn't kill all your threads too. Your Code Your Choice, but I would like to see your new version. :-)

Happy Programming!

Tim Lesher (author) 20 years, 6 months ago  # | flag

Thanks! I've implemented your version of the boolean test. Much more intuitive. Thanks!

Carl Kleffner 20 years, 2 months ago  # | flag

joinAll reorganized. a little change to the joinAll method, and joinAll waits for all threads to be deleted as it should be:

    def joinAll(self, waitForTasks = True, waitForThreads = True):

        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""

        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True

        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(0.1)

        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.goAway()
                for t in self.__threads:
                    t.join()
                    # print t,"joined"
                    del t
            self.__setThreadCountNolock(0)
            self.__isJoining = True

            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()

Carl

cmkleffner (at) gmx (dot) de
Yuhui Huang 20 years, 2 months ago  # | flag

Several modifications. First, I changed the implementation to use Queue.Queue for task-queueing. This eliminates the busy-waiting in each worker thread. (To end a thread, you post a dummy task. Once the next availible thread picks up the dummy task, it ends itself).

Second, instead of list, I use sets.Set to store the worker threads. One a thread ends itself, it can call a threadpool function to remove itself from the set.

Third, instead of letting each worker thread call the callback function, I created another Queue for the pending callbacks. Periodically a main thread can call a function to process all pending callbacks. Letting the main thread process these callbacks makes more sense to me.

Fourth, I added a counter for running tasks. (Those have been picked up by worker threads, but haven't finished).

The final result: I'm using the threadpool in a GUI web application (think of multitab/MDI browser). When the program needs to launch a web request, it adds a task.

In the OnIdle event handler, I call threadpool.processCallback() to handle all pending callbacks (all finished web requests). I also check the running task counter, so I can display a "working" message on the UI.

p.s. I would love to post my code. But its too long for a comment. But hope you get the idea from my description. :)

robert della malva 19 years, 4 months ago  # | flag

thread pool class in c++. Hi, Anyone knows about links to thread pool class in c++ Thanks, -Robert

patrick strateman 14 years, 12 months ago  # | flag

The return in the try block means you're holding onto the lock ....

def queueTask(self, task, args=None, taskCallback=None):

    """Insert a task into the queue.  task must be callable;
    args and taskCallback can be None."""

    if self.__isJoining == True:
        return False
    if not callable(task):
        return False

    self.__taskLock.acquire()
    try:
        self.__tasks.append((task, args, taskCallback))
    finally:
        self.__taskLock.release()
    return True
Sam Gleske 14 years, 3 months ago  # | flag

I'm processing a file.

Every time I process a file no matter how many lines it has...

pool = ThreadPool(3)

for line in proxyList:
    pool.queueTask(checkProxy, line, handleResult)

pool.joinAll()
print "Process Ended:", time.ctime()

Where pool = ThreadPool(3) is at the beginning of the file. The for loop is somewhere in the middle. pool.joinAll() is after the for loop. And print print "Process Ended:", time.ctime() is the very last line in the file

I get console output similar to this:

server port
server port
server port
server port
server port
Process Ended: Thu Dec 03 06:30:34 2009
server port
server port
server port

It doesn't matter if my file is 10 lines or 1000 lines. It always processes the first X amount of lines correctly and then runs joinAll 3 threads too soon. Everything after Process Ended: Thu Dec 03 06:30:34 2009 does not get processed correctly.

Any ideas as to how this can happen? Maybe there's a bug in the above script. I can't figure it out. I can post my original source if need be.

Sam Gleske 14 years, 3 months ago  # | flag

I added the same thing to your example. I added from time import ctime and on the last line I put print "Process Ended:", ctime() after the joinAll function.

The same thing happened to your provided example. Below is what my console showed after running many times I got the same result.

SortTask starting for  (1000, 100000)
 SortTask starting for WaitTask starting for   (5200
, WaitTask sleeping for 5 seconds200000
)
SortTask sorting for  (1000, 100000)
SortTask done for  (1000, 100000)
Callback called for ('Sorter ', (1000, 100000))
WaitTask starting for  2
WaitTask sleeping for 2 seconds
SortTask sorting for  (200, 200000)
SortTask done for  (200, 200000)
Callback called for ('Sorter ', (200, 200000))
SortTask starting for  (3, 30000)
SortTask sorting for  (3, 30000)
SortTask done for  (3, 30000)
Callback called for ('Sorter ', (3, 30000))
WaitTask starting for  7
WaitTask sleeping for 7 seconds
Process Ended: Thu Dec 03 06:51:21 2009
Callback called for ('Waiter', 2)
Callback called for ('Waiter', 5)
Callback called for ('Waiter', 7)

I guess there's a bug in the script. If I figure out what it is then I'll post a fix. If you figure it out before I do then please post a reply with how to fix it. SAM

Sam Gleske 14 years, 3 months ago  # | flag

I have also discovered that pool = ThreadPool(4) excludes the last 4 instructions, pool = ThreadPool(5) excludes the last 5 instructions, and so on.

Emilio Monti 13 years, 11 months ago  # | flag

Since Python 2.5 it is much easier to write a ThreadPool: http://code.activestate.com/recipes/577187-python-thread-pool/