|
|
This recipe shows how you can emulate coroutines in pure Python using generators.
With coroutine I mean a construct as available, for example, in Simula 67 or Modula2. They are like threads with two additional restrictions: at most one coroutine can be running at any time, and each coroutine yields control only at very specific points. Other terms I have heard for this concept are "cooperative multitasking", "non-preemptive multitasking" or Fibers (on Windows).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 | """
A coroutine implementation in Python.
Convert functions to iterator generators by replacing
return <expression>
with
yield <expression>
and function calls: f(*args, **kw) with for loops:
for i in f(*args, **kw):
pass
# the function value is in i now.
Unfortunately, you have to know which routine has been converted to
an iterator generator and which have not :-(.
"""
from sets import Set
from time import time, sleep
from heapq import heappush, heappop
class coroutine(object):
def __init__(self, scheduler, function, *args, **kw):
self.iterator = function(*args, **kw)
self.scheduler = scheduler
def suspend(self):
self.scheduler.suspend(self)
yield None
def activate(self):
self.scheduler.activate(self)
yield None
def sleep(self, sleeptime):
self.scheduler.sleep(self, sleeptime)
yield None
class scheduler(object):
"""
Scheduler which runs coroutines in a round robin fashion.
No priority scheme is implemented.
"""
def __init__(self):
self.running = []
self.running_coroutines = []
self.running_coroutines_map = {}
# This data describes the coroutines which are running.
# self.running contains the next routines to be called.
# self.running_coroutine maps coroutines to the position
# in self.running_coroutines
self.suspended = Set()
# Coroutines which are not running.
self.waiting_suspend = []
# Coroutines which are to transition from running to suspended
self.waiting_running = []
# Coroutines which are to start running
self.waiting_activate = []
# Coroutines which should switch to the running state.
self.timequeue = []
# heap of coroutines which are waiting to be activated
# at a specific time.
# Entry format: (time, coroutine)
self.event = False
# Indicates that a state transition should happen.
def add_coroutine(self, coroutine):
self.suspended.add(coroutine)
self.activate(coroutine)
def suspend(self, coroutine):
self.event = True
self.waiting_suspend.append(coroutine)
def activate(self, coroutine):
self.event = True
self.waiting_activate.append(coroutine)
def sleep(self, coroutine, sleeptime):
"""
Sleep for 'sleeptime'.
"""
item = (time() + sleeptime, coroutine)
heappush(self.timequeue, item)
self.suspend(coroutine)
def runat(self, coroutine, runtime):
"""
Run at a specific time.
"""
self.sleep(coroutine, sleeptime = runtime - time())
def handle_events(self):
"""
Handle state transitions from suspended to running
and vice versa.
"""
self.event = False
if self.waiting_suspend:
for c in self.waiting_suspend:
self.suspended.append(c)
for c in self.waiting_suspend:
index = self.running_coroutines_map[c]
self.remove_coroutine_from_running_list(index)
self.waiting_suspend = []
if self.waiting_activate:
for c in self.waiting_activate:
self.suspended.remove(c)
self.running_coroutines_map[c] = len(self.running)
self.running.append(c.iterator.next)
self.running_coroutines.append(c)
self.waiting_activate = []
def remove_coroutine_from_running_list(self, index):
c = self.running_coroutines[index]
del self.running[self.index]
del self.running_coroutines[index]
del self.running_coroutines_map[c]
def pop_timequeue(self):
waittime, coroutine = heappop(self.timequeue)
self.activate(coroutine)
def run(self):
running = self.running
running_coroutines = self.running_coroutines
timequeue = self.timequeue
self.event = True
while running or timequeue or self.event:
self.handle_events()
while running:
try:
for self.index, next in enumerate(running):
next()
except StopIteration:
self.remove_coroutine_from_running_list(self.index)
while timequeue and timequeue[0][0] <= time():
self.pop_timequeue()
if self.event:
self.handle_events()
if timequeue:
sleeptime = timequeue[0][0] - time()
sleep(sleeptime)
self.pop_timequeue()
def current_coroutine(self):
return self.running_coroutines[self.index]
class semaphore(object):
"""
Implements a binary semaphore.
"""
def __init__(self):
self.free = True
self.waiting = []
def acquire(self, coroutine):
if self.free:
self.free = False
else:
coroutine.suspend()
self.waiting.append(coroutine)
def release(self):
if self.waiting:
coroutine = self.waiting.pop(0)
coroutine.activate()
else:
self.free = True
class monitor(object):
def __init__(self):
self.sem = semaphore()
def run_protected(self, coroutine, function, *args, **kw):
"""
Call 'function' of 'coroutine'. 'function' is assumed to be a generator function.
at most one function call of this kind may can be running at any time.
"""
self.sem.acquire(coroutine)
try:
for i in function(*args, **kw):
pass
return i
finally:
self.sem.release()
current_scheduler = scheduler()
def current_coroutine():
return current_scheduler.current_coroutine()
if __name__ == '__main__':
running = True
def ack(m, n):
if not running:
return
if m == 0:
yield n + 1
return
if m > 0 and n == 0:
for i in ack(m-1, 1):
yield None
yield i
return
if m > 0 and n > 0:
for i in ack(m, n-1):
yield None
t = i
for i in ack(m-1, t):
yield None
yield i
return
def a(a1, a2):
def p(a1, a2):
print "Ackermann",
yield None
print a1,
yield None
print a2,
yield None
print "=",
yield None,
print i
for i in ack(a1, a2):
yield None
print_monitor.run_protected(current_coroutine(), p, a1, a2)
def watchdog():
global running
current_coroutine().sleep(600)
running = False
yield None
print_monitor = monitor()
count = 0
for i in range(100):
for j in range(10):
count += 1
cr1 = coroutine(current_scheduler, a, 1, i)
current_scheduler.add_coroutine(cr1)
for i in range(100):
for j in range(10):
count += 1
cr1 = coroutine(current_scheduler, a, i, j)
current_scheduler.add_coroutine(cr1)
current_scheduler.add_coroutine(coroutine(current_scheduler, watchdog))
print "Starting the run of ", count, "coroutines"
current_scheduler.run()
|
The recipe is not minimal: Modula2, for example, only provides the "transfer" procedure to explicitly transfer control from one coroutine to another. Instead, I implement a round robin scheduler which can run many coroutines at the same time. It also shows how easy it is to implement some traditional constructs for concurrency: semaphores and monitors. The non-premptive nature of coroutines, however, greatly reduces the necessity of such synchronization constructs.
As an illustration, the __main__ part of the module starts 2000 coroutines which compute the values of a highly recursive function in a quasi-parallel fashion(1 GB windows machine). The whole computation stops after 10 minutes.
Why would you choose coroutines over, say, threads?
You gain complete control over scheduling.
Since this implementation does not reserve stack space for a
coroutines, you might be able to run more coroutines than threads.
|
NOT coroutines! What is implemented here is not coroutines, but rather what I called "weightless threads" in my article:
http://www-106.ibm.com/developerworks/library/l-pythrd.html
However, you can implement SEMI-coroutines using a slightly different technique; I described this in:
http://www-106.ibm.com/developerworks/library/l-pygen.html#h3
The examples in both of these articles are quite a lot shorter than in this recipe, while being just as powerful (if not more so).
I have expanded the description and the discussion of this recipe. I hope this leaves no doubt what this code is and what it does.
still not right, imo. I think that as of now this recipes still is not talking about coroutines.
Coroutines, in my understanding, are just explainable like "functions that save control state between calls and can be called many times".
There is not point at all in having a scheduler, cause that does not relates with the concept of coroutines.
The only implementations of coroutines I can think of in python are available in Stackless, at least until GvR does not give us callcc :)