A simple Python ThreadPool based on the standard library's Queue object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
delays = [randrange(1, 10) for i in range(100)]
from time import sleep
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
# 1) Init a Thread pool with the desired number of threads
pool = ThreadPool(20)
for i, d in enumerate(delays):
# print the percentage of tasks placed in the queue
print '%.2f%c' % ((float(i)/float(len(delays)))*100.0,'%')
# 2) Add the task to the queue
pool.add_task(wait_delay, d)
# 3) Wait for completion
pool.wait_completion()
|
Thanks to Riccardo Govoni for the code review.
Hi Emilio,
Whats the best way to get "callbacks" or get the objects returned by a function run in this threaded pool.
I have the following:
So currently I have my function running in a threaded manor using your code, that works great. Please may you suggest/advise how do I pull values back from it?
I am sorry Charles, I read your comment only now.
The simplest solution to your problem is passing the "deployableVersions" dictionary as an additional argument to "myfunction". ie:
Hi Emilio, thanks for the handy code.
Figured I would just post a heads up for anybody that runs into a similar problem I had.
Some of my tasks add additional tasks back into the pool before they complete, this was causing intermittent problems with all of my worker threads eventually dying.
Turns out that the task queue was reaching capacity then blocking any additional puts() eventually causing it to fill up with tasks that were blocked waiting to add there sub tasks. The solution was an easy fix by initialising Queue(0) around line 22 the queue is allowed to expand as required preventing this type of deadlock.
thanks again - Shane Norris
I am getting intermittent failures with this, about one in ten times with Python 2.7.3
Traceback (most recent call last): File "/tools/python/python/lib/python2.7/threading.py", line 551, in __bootstrap_inner File "/fast/pq/bin/pylib/ThreadPool.py", line 15, in run File "/tools/python/python/lib/python2.7/Queue.py", line 168, in get File "/tools/python/python/lib/python2.7/threading.py", line 236, in wait <type 'exceptions.TypeError'>: 'NoneType' object is not callable
It is in fact line 15 from the code you posted here:
What Victor is experiencing is a race condition when the process terminates:
http://bugs.python.org/issue14623
To solve this you need to add a way to terminate a Worker, and implement a desctructor for ThreadPool that stops all Workers.
Hi ,
I am facing the below issue
Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.4/threading.py", line 442, in __bootstrap self.run() File "/mnt/images/R_FPT_41.4.1.31.atca64.r.1511050545.393250/x86_64/std/opt/nsn/SS_FirmwareATCA/scripts/ADSP1_B_lynxfpt_threadpool.py", line 30, in run func, args, kargs = self.tasks.get() File "/opt/nokiasiemens/SS_Firmware/bin/multiprocessing/queues.py", line 91, in get res = self._recv() EOFError
can somebody please help not able to find the reason for this failure.
I am facing the below issue
Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.4/threading.py", line 442, in __bootstrap self.run() in run func, args, kargs = self.tasks.get() in multiprocessing/queues.py", line 91, in get res = self._recv() EOFError
can somebody please help not able to find the reason for this failure.