A thread pool class that takes arbitrary callables as work units, and supports callbacks when the work unit is complete.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
import threading from time import sleep # Ensure booleans exist (not needed for Python 2.2.1 or higher) try: True except NameError: False = 0 True = not False class ThreadPool: """Flexible thread pool class. Creates a pool of threads, then accepts tasks that will be dispatched to the next available thread.""" def __init__(self, numThreads): """Initialize the thread pool with numThreads workers.""" self.__threads =  self.__resizeLock = threading.Condition(threading.Lock()) self.__taskLock = threading.Condition(threading.Lock()) self.__tasks =  self.__isJoining = False self.setThreadCount(numThreads) def setThreadCount(self, newNumThreads): """ External method to set the current pool size. Acquires the resizing lock, then calls the internal version to do real work.""" # Can't change the thread count if we're shutting down the pool! if self.__isJoining: return False self.__resizeLock.acquire() try: self.__setThreadCountNolock(newNumThreads) finally: self.__resizeLock.release() return True def __setThreadCountNolock(self, newNumThreads): """Set the current pool size, spawning or terminating threads if necessary. Internal use only; assumes the resizing lock is held.""" # If we need to grow the pool, do so while newNumThreads > len(self.__threads): newThread = ThreadPoolThread(self) self.__threads.append(newThread) newThread.start() # If we need to shrink the pool, do so while newNumThreads < len(self.__threads): self.__threads.goAway() del self.__threads def getThreadCount(self): """Return the number of threads in the pool.""" self.__resizeLock.acquire() try: return len(self.__threads) finally: self.__resizeLock.release() def queueTask(self, task, args=None, taskCallback=None): """Insert a task into the queue. task must be callable; args and taskCallback can be None.""" if self.__isJoining == True: return False if not callable(task): return False self.__taskLock.acquire() try: self.__tasks.append((task, args, taskCallback)) return True finally: self.__taskLock.release() def getNextTask(self): """ Retrieve the next task from the task queue. For use only by ThreadPoolThread objects contained in the pool.""" self.__taskLock.acquire() try: if self.__tasks == : return (None, None, None) else: return self.__tasks.pop(0) finally: self.__taskLock.release() def joinAll(self, waitForTasks = True, waitForThreads = True): """ Clear the task queue and terminate all pooled threads, optionally allowing the tasks and threads to finish.""" # Mark the pool as joining to prevent any more task queueing self.__isJoining = True # Wait for tasks to finish if waitForTasks: while self.__tasks != : sleep(.1) # Tell all the threads to quit self.__resizeLock.acquire() try: self.__setThreadCountNolock(0) self.__isJoining = True # Wait until all threads have exited if waitForThreads: for t in self.__threads: t.join() del t # Reset the pool for potential reuse self.__isJoining = False finally: self.__resizeLock.release() class ThreadPoolThread(threading.Thread): """ Pooled thread class. """ threadSleepTime = 0.1 def __init__(self, pool): """ Initialize the thread and remember the pool. """ threading.Thread.__init__(self) self.__pool = pool self.__isDying = False def run(self): """ Until told to quit, retrieve the next task and execute it, calling the callback if any. """ while self.__isDying == False: cmd, args, callback = self.__pool.getNextTask() # If there's nothing to do, just sleep a bit if cmd is None: sleep(ThreadPoolThread.threadSleepTime) elif callback is None: cmd(args) else: callback(cmd(args)) def goAway(self): """ Exit the run loop next time through.""" self.__isDying = True # Usage example if __name__ == "__main__": from random import randrange # Sample task 1: given a start and end value, shuffle integers, # then sort them def sortTask(data): print "SortTask starting for ", data numbers = range(data, data) for a in numbers: rnd = randrange(0, len(numbers) - 1) a, numbers[rnd] = numbers[rnd], a print "SortTask sorting for ", data numbers.sort() print "SortTask done for ", data return "Sorter ", data # Sample task 2: just sleep for a number of seconds. def waitTask(data): print "WaitTask starting for ", data print "WaitTask sleeping for %d seconds" % data sleep(data) return "Waiter", data # Both tasks use the same callback def taskCallback(data): print "Callback called for", data # Create a pool with three worker threads pool = ThreadPool(3) # Insert tasks into the queue and let them run pool.queueTask(sortTask, (1000, 100000), taskCallback) pool.queueTask(waitTask, 5, taskCallback) pool.queueTask(sortTask, (200, 200000), taskCallback) pool.queueTask(waitTask, 2, taskCallback) pool.queueTask(sortTask, (3, 30000), taskCallback) pool.queueTask(waitTask, 7, taskCallback) # When all tasks are finished, allow the threads to terminate pool.joinAll()
A colleague showed me a very nice thread pool class he wrote in C++, using generic programming (templates). The code was so neat and concise, it almost resembled Python, so naturally I took the next step: actually implementing it in Python.
A thread pool is a construct used to simply the use of threads and decrease the overhead of starting up worker threads for short-lived tasks. When the pool is created, it creates a number of worker threads, each of which waits for work to do. When the pool is given work, it dispatches the work to one of the waiting threads, which does the work, returns the result, and then goes back to waiting for more work. The pool can be dynamically resized (for example, based on feedback like the number of threads currently idle, or the number of unexecuted tasks in the queue).
Tasks are submitted to the pool with the queueTask() method. This method takes any callable as the task, an optional data argument that is passed to the task, and an optional callback that can be called when the task has completed. The data argument is a convenience feature; it could be eliminated by currying the data into the task.
To shut down the thread pool, use the joinAll() method. joinAll() takes two optional arguments: waitForTasks and waitForThreads. If waitForTasks is true, joinAll will block until any currently-queued tasks have been completed. If waitForThreads is true, joinAll will wait until all threads in the pool have exited before returning. Both parameters default to true.
A thread pool in Python is somewhat less useful than it is in other languages, because the individual threads (for the most part) cannot run at the same time on multi-processor machines, because Python has a single Global Interpreter Lock (GIL) that must be held to execute any Python code. However, it can still be a good tool for better organizing threaded tasks, and minimizing the overhead of thread creation and destruction.
Nice. I like this implementation. It is compact, and
well written. But do you really need the resize
lock, since the thread pool will mostly be initialized
with a size ?
Correct... You're correct--the resize lock isn't necessary if you don't use dynamic resizing of the pool. In that case, the setThreadCount() method could be removed as well.
It's not that often that you really need a dynamically-sized thread pool, but in some narrow cases (long-running servers on loaded machines, with very bursty workloads), it can make sense.
True / False. If you copy and run the code as written, python will error out with a traceback b/c 'True' and 'False' aren't defined (at least in python 2.2). Defining True and False fixed that and it works great :D
Yes... True and False were added to __builtins__ in 2.2.1. I should either eliminate them or add code to cope with earlier versions.
Done. I've added checks for True and False; this should now work on Python 2.2.
Why busy wait? Using a Condition Object's wait() and notify() you could make ThreadPoolThread::run() block until there was work to do, rather than waking up every tenth of a second to probe the pool.
This will be more efficient than busy waiting, unless there is almost always something on the task queue.
Great Idea. This is a great idea. Thank you for providing it. I'm currently using this code in a log rotation script that I've written.
I've noticed, however, that the test you do at the beginning doesn't work in Python 2.3 (maybe others) if the code is imported to be used by other code. I replaced it with the following, which seems to work ok:
You JoinAll doesn't work. Notice that you zero out your threads list before you wait on your join. You zero it out at __setThreadCountNoLock(0). Therefore by the time you get to your loop there is NEVER anything to loop on.
I've mucked around with it. What I think you might need is, a WaitForEveryoneDone routine. Something that doesn't kill all your threads too. Your Code Your Choice, but I would like to see your new version. :-)
Thanks! I've implemented your version of the boolean test. Much more intuitive. Thanks!
joinAll reorganized. a little change to the joinAll method, and joinAll waits for all threads to be deleted as it should be:
Several modifications. First, I changed the implementation to use Queue.Queue for task-queueing. This eliminates the busy-waiting in each worker thread. (To end a thread, you post a dummy task. Once the next availible thread picks up the dummy task, it ends itself).
Second, instead of list, I use sets.Set to store the worker threads. One a thread ends itself, it can call a threadpool function to remove itself from the set.
Third, instead of letting each worker thread call the callback function, I created another Queue for the pending callbacks. Periodically a main thread can call a function to process all pending callbacks. Letting the main thread process these callbacks makes more sense to me.
Fourth, I added a counter for running tasks. (Those have been picked up by worker threads, but haven't finished).
The final result: I'm using the threadpool in a GUI web application (think of multitab/MDI browser). When the program needs to launch a web request, it adds a task.
In the OnIdle event handler, I call threadpool.processCallback() to handle all pending callbacks (all finished web requests). I also check the running task counter, so I can display a "working" message on the UI.
p.s. I would love to post my code. But its too long for a comment. But hope you get the idea from my description. :)
thread pool class in c++. Hi, Anyone knows about links to thread pool class in c++ Thanks, -Robert
The return in the try block means you're holding onto the lock ....
def queueTask(self, task, args=None, taskCallback=None):
I'm processing a file.
Every time I process a file no matter how many lines it has...
Where pool = ThreadPool(3) is at the beginning of the file. The for loop is somewhere in the middle. pool.joinAll() is after the for loop. And print print "Process Ended:", time.ctime() is the very last line in the file
I get console output similar to this:
It doesn't matter if my file is 10 lines or 1000 lines. It always processes the first X amount of lines correctly and then runs joinAll 3 threads too soon. Everything after Process Ended: Thu Dec 03 06:30:34 2009 does not get processed correctly.
Any ideas as to how this can happen? Maybe there's a bug in the above script. I can't figure it out. I can post my original source if need be.
I added the same thing to your example. I added from time import ctime and on the last line I put print "Process Ended:", ctime() after the joinAll function.
The same thing happened to your provided example. Below is what my console showed after running many times I got the same result.
I guess there's a bug in the script. If I figure out what it is then I'll post a fix. If you figure it out before I do then please post a reply with how to fix it. SAM
I have also discovered that pool = ThreadPool(4) excludes the last 4 instructions, pool = ThreadPool(5) excludes the last 5 instructions, and so on.
Since Python 2.5 it is much easier to write a ThreadPool: http://code.activestate.com/recipes/577187-python-thread-pool/