Welcome, guest | Sign In | My Account | Store | Cart

Cooperative multitasking offers an alternative to using threads. It can be harder to use in some cases (blocking IO) but in other it can be much easier (sharing data between tasks). This recipe shows how to use generators to achieve simple, cooperative multitasking, that allows thousends of 'simultaneously' running tasks.

Python, 53 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from collections import deque

class Task:
    def __init__(self, pool):
        self.generator = self.main()
        pool.add(self)
    def main(self):
        "Must be a generator"
        pass

class TaskPool:
    """
    NOTE max speed ~~ 20000 task switches per second per 100MHz
    NOTE using pyrex or psyco ~~ 25% speed improvement
    NOTE ram usage ~~ 1KB per task
    """
    def __init__(self):
        self.tasks = deque()
    def add(self, task):
        self.tasks.append(task)
    def iteration(self, iter_cnt=1):
        tasks = self.tasks
        for i in range(iter_cnt):
            try:
                tasks[0].generator.next()
                tasks.rotate(-1)
            except StopIteration:
                del tasks[0]
            except IndexError:
                # allow internal exception to propagate
                if len(tasks) > 0: raise


#### EXAMPLE #########################################################

class ExampleTask(Task):
    def __init__(self, pool, name, max_iterations):
        self.name = name
        self.max_iterations = max_iterations
        Task.__init__(self, pool)
    def main(self):
        i = 0
        while i < self.max_iterations:
            print self.name, i
            i += 1
            yield 0
        print self.name, 'finishing'

pool = TaskPool()
task_a = ExampleTask(pool, 'AAA',  5)
task_b = ExampleTask(pool, 'bbb', 10)
for i in xrange(100):
    pool.iteration()

Output from the example code:

AAA 0 bbb 0 AAA 1 bbb 1 AAA 2 bbb 2 AAA 3 bbb 3 AAA 4 bbb 4 AAA finishing bbb 5 bbb 6 bbb 7 bbb 8 bbb 9 bbb finishing

7 comments

S W 15 years, 10 months ago  # | flag

Exception Handling. This style of cooperative threading can be very useful. There is one small issue with the exception handling.

If the user's code which is run by the line:

tasks[0].generator.next()

raises IndexError, the Exception will silently pass.

Maciej Obarski (author) 15 years, 10 months ago  # | flag

Exception Handling. Explicit testing "len(tasks) == 0" is about 15% slower (for tasks that yield often), but the idea of silent exception is also bad. The solution:

except IndexError:
  if len(tasks) > 0: raise

Thanks for advice :-)

Steven Bethard 15 years, 10 months ago  # | flag

Or you can be a little more careful to put only what might raise an exception into the try/except block::

try:
    task = tasks[0]
except IndexError:
    continue
try:
    task.generator.next()
except StopIteration:
    del tasks[0]
else:
    tasks.rotate(-1)
Wai Yip Tung 15 years, 10 months ago  # | flag

See Charming Python series in IBM developer works.

Similar approach has been written about in

Charming Python: Implementing "weightless threads" with Python generators
David Mertz
http://www-128.ibm.com/developerworks/linux/library/l-pythrd.html

One limitation is the yield must be called in main(). If main() calls
another method you cannot execute in that method or it will be
treated as generator.
Maciej Obarski (author) 15 years, 10 months ago  # | flag

Thanks for the link, it's a nice article.

About yield in main(): I have come up with the recipe how to yield from generator used in another generator. It should make generator based kernels even more powerfull. Please see recipe 466299 for details.

http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/466299
tuco Leone 14 years, 4 months ago  # | flag

Batteries Included? My 2 cents and worth much less...

class TaskPool:
    ...
    ...
    def isRunning(self):
        return len(self.tasks) > 0

pool = TaskPool()
...
while pool.isRunning():
    pool.iteration()
Nikhil Sadasivan 7 years, 4 months ago  # | flag

Can anyone please explain the code to me? I'm a tad bit new to programming, and my project requires parallel processing. I know it is a really old post, but any help would be appreciated

Created by Maciej Obarski on Thu, 5 Jan 2006 (PSF)
Python recipes (4591)
Maciej Obarski's recipes (6)

Required Modules

Other Information and Tasks