Welcome, guest | Sign In | My Account | Store | Cart

A simple Python ThreadPool based on the standard library's Queue object.

Python, 53 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from Queue import Queue
from threading import Thread

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()
    
    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception, e: print e
            self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    delays = [randrange(1, 10) for i in range(100)]
    
    from time import sleep
    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)
    
    # 1) Init a Thread pool with the desired number of threads
    pool = ThreadPool(20)
    
    for i, d in enumerate(delays):
        # print the percentage of tasks placed in the queue
        print '%.2f%c' % ((float(i)/float(len(delays)))*100.0,'%')
        
        # 2) Add the task to the queue
        pool.add_task(wait_delay, d)
    
    # 3) Wait for completion
    pool.wait_completion()

8 comments

Emilio Monti (author) 14 years ago  # | flag

Thanks to Riccardo Govoni for the code review.

Charles Sibbald 13 years, 5 months ago  # | flag

Hi Emilio,

Whats the best way to get "callbacks" or get the objects returned by a function run in this threaded pool.

I have the following:

def myfunction(c, t):
    versionList = []
    #code does some processing, and populates a list

    return versionList

def threadedCompileVersions():
    components = ("component_a", "component_b", "component_c", "component_d")
    types = ("Beta", "RC", "GA")
    nullKey = ("No Releases Yet",)
    deployableVersions = {}
    threadList = []
    #inialise list names dynamically
    for component in components:
        for type in types:
            threadList.append("_".join([component, type]))
    print threadList
    pool = ThreadPool(len(threadList))
    for item in threadList:
        threadArgs = item.split("_")
        print "Processing versions with args : {0}, {1}".format(threadArgs[0], threadArgs[1])
        pool.add_task(myfunction, threadArgs[0], threadArgs[1])
    pool.wait_completion()

#At this point i would need to populate deployableVersions dict, with values from the threads that have run.

So currently I have my function running in a threaded manor using your code, that works great. Please may you suggest/advise how do I pull values back from it?

Emilio Monti (author) 13 years, 2 months ago  # | flag

I am sorry Charles, I read your comment only now.

The simplest solution to your problem is passing the "deployableVersions" dictionary as an additional argument to "myfunction". ie:

def myfunction(c, t, d):
    ...

pool.add_task(myfunction, threadArgs[0], threadArgs[1], deployableVersions)
norlesh 13 years, 2 months ago  # | flag

Hi Emilio, thanks for the handy code.

Figured I would just post a heads up for anybody that runs into a similar problem I had.

Some of my tasks add additional tasks back into the pool before they complete, this was causing intermittent problems with all of my worker threads eventually dying.

Turns out that the task queue was reaching capacity then blocking any additional puts() eventually causing it to fill up with tasks that were blocked waiting to add there sub tasks. The solution was an easy fix by initialising Queue(0) around line 22 the queue is allowed to expand as required preventing this type of deadlock.

thanks again - Shane Norris

Victor R. Volkman 10 years, 10 months ago  # | flag

I am getting intermittent failures with this, about one in ten times with Python 2.7.3

Traceback (most recent call last): File "/tools/python/python/lib/python2.7/threading.py", line 551, in __bootstrap_inner File "/fast/pq/bin/pylib/ThreadPool.py", line 15, in run File "/tools/python/python/lib/python2.7/Queue.py", line 168, in get File "/tools/python/python/lib/python2.7/threading.py", line 236, in wait <type 'exceptions.TypeError'>: 'NoneType' object is not callable

It is in fact line 15 from the code you posted here:

func, args, kargs = self.tasks.get()
christoph.gysin 9 years, 4 months ago  # | flag

What Victor is experiencing is a race condition when the process terminates:

http://bugs.python.org/issue14623

To solve this you need to add a way to terminate a Worker, and implement a desctructor for ThreadPool that stops all Workers.

Rajesh 8 years, 5 months ago  # | flag

Hi ,

I am facing the below issue

Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.4/threading.py", line 442, in __bootstrap self.run() File "/mnt/images/R_FPT_41.4.1.31.atca64.r.1511050545.393250/x86_64/std/opt/nsn/SS_FirmwareATCA/scripts/ADSP1_B_lynxfpt_threadpool.py", line 30, in run func, args, kargs = self.tasks.get() File "/opt/nokiasiemens/SS_Firmware/bin/multiprocessing/queues.py", line 91, in get res = self._recv() EOFError

can somebody please help not able to find the reason for this failure.

Rajesh 8 years, 5 months ago  # | flag

I am facing the below issue

Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.4/threading.py", line 442, in __bootstrap self.run() in run func, args, kargs = self.tasks.get() in multiprocessing/queues.py", line 91, in get res = self._recv() EOFError

can somebody please help not able to find the reason for this failure.