Welcome, guest | Sign In | My Account | Store | Cart

Easy to use Thread Pool with a dynamically adjustable pool size.

Python, 94 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from threading import Thread
from Queue import Queue
from time import sleep

from tools import propertx # from http://code.activestate.com/recipes/502243/

class Pool(object):
    def __init__(self, Psize=20, Qsize=20):
        self.workers = []
        self.jobs = Queue(Qsize)
        self.size=Psize
        self.accept=True
    @propertx
    def size():
        def set(self, newSize):
            for i in xrange(newSize-len(self.workers)):
                new = Worker(self)
                self.workers.append(new)
                new.start()
            for i in xrange(len(self.workers)-newSize):
                self.jobs.put((None, None, None, None))
        def get(self):
            return len(self.workers)
        return get, set
    def add(self, job, callback=None, *args, **kw):
        if self.accept:
            self.jobs.put((job, args, kw, callback))
    def join(self):
        self.accept=False
        while True:
            for w in self.workers:
                if w.isAlive() :
                    self.jobs.put((None, None, None, None))
                    break
            else:
                break
    @property
    def qsize(self): return self.jobs.qsize()

class Worker(Thread):
    def __init__(self, pool):
        Thread.__init__(self)
        self.pool = pool
        self.cmd=''
    def run(self):
        get=self.pool.jobs.get
        while True:
            cmd, args, kw, callback = get()
            if cmd:
                self.cmd=cmd.__name__
                if callback:
                    callback(cmd(*args, **kw))
                else:
                    cmd(*args, **kw)
                self.cmd=''
            else:
                self.pool.workers.remove(self)
                break

if __name__=='__main__':

    def easyJob(*arg, **kw):
        n=arg[0]
        print '\tSleep\t\t', n
        sleep(n)
        return 'Slept\t%d' % n
    def longJob(*arg, **kw):
        print "\tStart\t\t\t", arg, kw
        n=arg[0]*3
        sleep(n)
        return "Job done in %d" % n
    def badJob(*a, **k):
        print '\n !!! OOOPS !!!\n'
        a=1/0
    def show(*arg, **kw):
        print 'callback : %s' % arg[0]

    pool = Pool(5, 50)
    print "\n\t\t... let's add some jobs ...\n"
    for j in range(5):
        if j==1: pool.add(badJob)
        for i in range(5,0,-1):
            pool.add(longJob, show, i)
            pool.add(easyJob, show, i)
    print '''
        \t\t... and now, we're waiting for the %i workers to get the %i jobs done ...
    ''' % (pool.size, pool.qsize)
    sleep(15)
    print "\n\t\t... ok, that may take a while, let's get some reinforcement ...\n"
    sleep(5)
    pool.size=50
    print '\n\t\t... Joining ...\n'
    pool.join()
    print '\n\t\t... Ok ...\n'

3 comments

Alexander Artemenko 13 years ago  # | flag
  • Too complex, why do you need to feed workers with all these empty (None, None, None, None) tuples?
  • Why do you remove workers when queue is empty?
  • Where do you create new workers in case, if all jobs run out, and then another job was placed in the queue?
  • Can I have one job, which adds another jobs to the queue?
  • So many questions, isn't it, huh? :-)
tiago coutinho 12 years, 11 months ago  # | flag

sorry, my last post was wrong:

def size():

is correct! Please ignore my last post.

tiago coutinho 12 years, 11 months ago  # | flag

I believe there is a bug in the code: the method size is missing the self parameter: - It is currently: line 14: def size():

  • Should be: line 14: def size(self):