Thread pool that grows on demand.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | #! /usr/bin/python
import threading
import Queue
import time
import sys
Instance = None
def getInstance():
global Instance
if not Instance:
Instance = ThreadPool()
return Instance
class ThreadPool:
def __init__(self,maxWorkers = 10):
self.tasks = Queue.Queue()
self.workers = 0
self.working = 0
self.maxWorkers = maxWorkers
self.allKilled = threading.Event()
self.countLock = threading.RLock()
self.timers = {}
self.timersLock = threading.Lock()
self.timersThreadLock = threading.Lock()
self.timersEvent = threading.Event()
self.allKilled.set()
def run(self,target,callback = None, *args, **kargs):
""" starts task.
target = callable to run with *args and **kargs arguments.
callback = callable executed when target ends
callback sould accept one parameter where target's
return value is passed.
If callback is None it's ignored.
"""
self.countLock.acquire()
if not self.workers:
self.addWorker()
self.countLock.release()
self.tasks.put((target,callback,args,kargs))
def setMaxWorkers(self,num):
""" Sets the maximum workers to create.
num = max workers
If number passed is lower than active workers
it will kill workers to match that number.
"""
self.countLock.acquire()
self.maxWorkers = num
if self.workers > self.maxWorkers:
self.killWorker(self.workers - self.maxWorkers)
self.countLock.release()
def addWorker(self,num = 1):
""" Add workers.
num = number of workers to create/add.
"""
for x in xrange(num):
self.countLock.acquire()
self.workers += 1
self.allKilled.clear()
self.countLock.release()
t = threading.Thread(target = self.__workerThread)
t.setDaemon(True)
t.start()
def killWorker(self,num = 1):
""" Kill workers.
num = number of workers to kill.
"""
self.countLock.acquire()
if num > self.workers:
num = self.workers
self.countLock.release()
for x in xrange(num):
self.tasks.put("exit")
def killAllWorkers(self,wait = None):
""" Kill all active workers.
wait = seconds to wait until last worker ends
if None it waits forever.
"""
self.countLock.acquire()
self.killWorker(self.workers)
self.countLock.release()
self.allKilled.wait(wait)
def __workerThread(self):
while True:
task = self.tasks.get()
# exit is "special" tasks to kill thread
if task == "exit":
break
self.countLock.acquire()
self.working += 1
if (self.working >= self.workers) and (self.workers < self.maxWorkers): # create thread on demand
self.addWorker()
self.countLock.release()
fun,cb,args,kargs = task
try:
ret = fun(*args,**kargs)
if cb:
cb(ret)
except:
print "Unexpected error:", sys.exc_info()
self.countLock.acquire()
self.working -= 1
self.countLock.release()
self.countLock.acquire()
self.workers -= 1
if not self.workers:
self.allKilled.set()
self.countLock.release()
def timer(self, cb, period):
""" Add or remove timers.
cb = callback function.
period = period in seconds (float)
if period is 0 timer is deleted.
"""
self.run(self.__timerThread, None, cb, period)
def __timerThread(self, cb, period):
self.timersLock.acquire()
self.timersEvent.set()
if not period:
if cb in self.timers:
del(self.timers[cb])
self.timersLock.release()
return
self.timers[cb] = [period,time.time()]
self.timersLock.release()
if not self.timersThreadLock.acquire(0):
return
while True:
self.timersLock.acquire()
if len(self.timers) == 0:
self.timersThreadLock.release()
self.timersLock.release()
break
minWait = 30*24*3600
now = time.time()
for k,v in self.timers.items():
period, last = v
wait = period - (now - last)
if wait <=0:
self.run(k)
wait = period
v[1] = now
if wait < minWait:
minWait = wait
self.timersLock.release()
self.timersEvent.wait(minWait)
self.timersEvent.clear()
|
This is a flexible thread pool class, it can increment workers when needed.
The main methods you need are run(), setMaxWorkers() and killAllWorkers().
11/03/2009: Added timeout callback system.
Un +1 ;)