This recipe shows how you can emulate coroutines in pure Python using generators.
With coroutine I mean a construct as available, for example, in Simula 67 or Modula2. They are like threads with two additional restrictions: at most one coroutine can be running at any time, and each coroutine yields control only at very specific points. Other terms I have heard for this concept are "cooperative multitasking", "non-preemptive multitasking" or Fibers (on Windows).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | """
A coroutine implementation in Python.
Convert functions to iterator generators by replacing
return <expression>
with
yield <expression>
and function calls: f(*args, **kw) with for loops:
for i in f(*args, **kw):
pass
# the function value is in i now.
Unfortunately, you have to know which routine has been converted to
an iterator generator and which have not :-(.
"""
from sets import Set
from time import time, sleep
from heapq import heappush, heappop
class coroutine(object):
def __init__(self, scheduler, function, *args, **kw):
self.iterator = function(*args, **kw)
self.scheduler = scheduler
def suspend(self):
self.scheduler.suspend(self)
yield None
def activate(self):
self.scheduler.activate(self)
yield None
def sleep(self, sleeptime):
self.scheduler.sleep(self, sleeptime)
yield None
class scheduler(object):
"""
Scheduler which runs coroutines in a round robin fashion.
No priority scheme is implemented.
"""
def __init__(self):
self.running = []
self.running_coroutines = []
self.running_coroutines_map = {}
# This data describes the coroutines which are running.
# self.running contains the next routines to be called.
# self.running_coroutine maps coroutines to the position
# in self.running_coroutines
self.suspended = Set()
# Coroutines which are not running.
self.waiting_suspend = []
# Coroutines which are to transition from running to suspended
self.waiting_running = []
# Coroutines which are to start running
self.waiting_activate = []
# Coroutines which should switch to the running state.
self.timequeue = []
# heap of coroutines which are waiting to be activated
# at a specific time.
# Entry format: (time, coroutine)
self.event = False
# Indicates that a state transition should happen.
def add_coroutine(self, coroutine):
self.suspended.add(coroutine)
self.activate(coroutine)
def suspend(self, coroutine):
self.event = True
self.waiting_suspend.append(coroutine)
def activate(self, coroutine):
self.event = True
self.waiting_activate.append(coroutine)
def sleep(self, coroutine, sleeptime):
"""
Sleep for 'sleeptime'.
"""
item = (time() + sleeptime, coroutine)
heappush(self.timequeue, item)
self.suspend(coroutine)
def runat(self, coroutine, runtime):
"""
Run at a specific time.
"""
self.sleep(coroutine, sleeptime = runtime - time())
def handle_events(self):
"""
Handle state transitions from suspended to running
and vice versa.
"""
self.event = False
if self.waiting_suspend:
for c in self.waiting_suspend:
self.suspended.append(c)
for c in self.waiting_suspend:
index = self.running_coroutines_map[c]
self.remove_coroutine_from_running_list(index)
self.waiting_suspend = []
if self.waiting_activate:
for c in self.waiting_activate:
self.suspended.remove(c)
self.running_coroutines_map[c] = len(self.running)
self.running.append(c.iterator.next)
self.running_coroutines.append(c)
self.waiting_activate = []
def remove_coroutine_from_running_list(self, index):
c = self.running_coroutines[index]
del self.running[self.index]
del self.running_coroutines[index]
del self.running_coroutines_map[c]
def pop_timequeue(self):
waittime, coroutine = heappop(self.timequeue)
self.activate(coroutine)
def run(self):
running = self.running
running_coroutines = self.running_coroutines
timequeue = self.timequeue
self.event = True
while running or timequeue or self.event:
self.handle_events()
while running:
try:
for self.index, next in enumerate(running):
next()
except StopIteration:
self.remove_coroutine_from_running_list(self.index)
while timequeue and timequeue[0][0] <= time():
self.pop_timequeue()
if self.event:
self.handle_events()
if timequeue:
sleeptime = timequeue[0][0] - time()
sleep(sleeptime)
self.pop_timequeue()
def current_coroutine(self):
return self.running_coroutines[self.index]
class semaphore(object):
"""
Implements a binary semaphore.
"""
def __init__(self):
self.free = True
self.waiting = []
def acquire(self, coroutine):
if self.free:
self.free = False
else:
coroutine.suspend()
self.waiting.append(coroutine)
def release(self):
if self.waiting:
coroutine = self.waiting.pop(0)
coroutine.activate()
else:
self.free = True
class monitor(object):
def __init__(self):
self.sem = semaphore()
def run_protected(self, coroutine, function, *args, **kw):
"""
Call 'function' of 'coroutine'. 'function' is assumed to be a generator function.
at most one function call of this kind may can be running at any time.
"""
self.sem.acquire(coroutine)
try:
for i in function(*args, **kw):
pass
return i
finally:
self.sem.release()
current_scheduler = scheduler()
def current_coroutine():
return current_scheduler.current_coroutine()
if __name__ == '__main__':
running = True
def ack(m, n):
if not running:
return
if m == 0:
yield n + 1
return
if m > 0 and n == 0:
for i in ack(m-1, 1):
yield None
yield i
return
if m > 0 and n > 0:
for i in ack(m, n-1):
yield None
t = i
for i in ack(m-1, t):
yield None
yield i
return
def a(a1, a2):
def p(a1, a2):
print "Ackermann",
yield None
print a1,
yield None
print a2,
yield None
print "=",
yield None,
print i
for i in ack(a1, a2):
yield None
print_monitor.run_protected(current_coroutine(), p, a1, a2)
def watchdog():
global running
current_coroutine().sleep(600)
running = False
yield None
print_monitor = monitor()
count = 0
for i in range(100):
for j in range(10):
count += 1
cr1 = coroutine(current_scheduler, a, 1, i)
current_scheduler.add_coroutine(cr1)
for i in range(100):
for j in range(10):
count += 1
cr1 = coroutine(current_scheduler, a, i, j)
current_scheduler.add_coroutine(cr1)
current_scheduler.add_coroutine(coroutine(current_scheduler, watchdog))
print "Starting the run of ", count, "coroutines"
current_scheduler.run()
|
The recipe is not minimal: Modula2, for example, only provides the "transfer" procedure to explicitly transfer control from one coroutine to another. Instead, I implement a round robin scheduler which can run many coroutines at the same time. It also shows how easy it is to implement some traditional constructs for concurrency: semaphores and monitors. The non-premptive nature of coroutines, however, greatly reduces the necessity of such synchronization constructs.
As an illustration, the __main__ part of the module starts 2000 coroutines which compute the values of a highly recursive function in a quasi-parallel fashion(1 GB windows machine). The whole computation stops after 10 minutes.
Why would you choose coroutines over, say, threads?
You gain complete control over scheduling.
Since this implementation does not reserve stack space for a coroutines, you might be able to run more coroutines than threads.
NOT coroutines! What is implemented here is not coroutines, but rather what I called "weightless threads" in my article:
http://www-106.ibm.com/developerworks/library/l-pythrd.html
However, you can implement SEMI-coroutines using a slightly different technique; I described this in:
http://www-106.ibm.com/developerworks/library/l-pygen.html#h3
The examples in both of these articles are quite a lot shorter than in this recipe, while being just as powerful (if not more so).
I have expanded the description and the discussion of this recipe. I hope this leaves no doubt what this code is and what it does.
still not right, imo. I think that as of now this recipes still is not talking about coroutines.
Coroutines, in my understanding, are just explainable like "functions that save control state between calls and can be called many times".
There is not point at all in having a scheduler, cause that does not relates with the concept of coroutines.
The only implementations of coroutines I can think of in python are available in Stackless, at least until GvR does not give us callcc :)