Welcome, guest | Sign In | My Account | Store | Cart

This recipe shows how you can emulate coroutines in pure Python using generators.

With coroutine I mean a construct as available, for example, in Simula 67 or Modula2. They are like threads with two additional restrictions: at most one coroutine can be running at any time, and each coroutine yields control only at very specific points. Other terms I have heard for this concept are "cooperative multitasking", "non-preemptive multitasking" or Fibers (on Windows).

Python, 269 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
A coroutine implementation in Python.

Convert functions to iterator generators by replacing

return <expression>

with

yield <expression>

and function calls: f(*args, **kw) with for loops:

for i in f(*args, **kw):
   pass
# the function value is in i now.

Unfortunately, you have to know which routine has been converted to
an iterator generator and which have not :-(.
"""
from sets import Set
from time import time, sleep
from heapq import heappush, heappop

class coroutine(object):
   def __init__(self, scheduler, function, *args, **kw):
      self.iterator = function(*args, **kw)
      self.scheduler = scheduler
      
   def suspend(self):
      self.scheduler.suspend(self)
      yield None

   def activate(self):
      self.scheduler.activate(self)
      yield None
      
   def sleep(self, sleeptime):
      self.scheduler.sleep(self, sleeptime)
      yield None
      
class scheduler(object):
   """
   Scheduler which runs coroutines in a round robin fashion.
   No priority scheme is implemented.
   """
   def __init__(self):
      self.running = []
      self.running_coroutines = []
      self.running_coroutines_map = {}
      # This data describes the coroutines which are running.
      # self.running contains the next routines to be called.
      # self.running_coroutine maps coroutines to the position
      # in self.running_coroutines

      self.suspended = Set()
      # Coroutines which are not running.
      
      self.waiting_suspend = []
      # Coroutines which are to transition from running to suspended
      
      self.waiting_running = []
      # Coroutines which are to start running
      
      self.waiting_activate = []
      # Coroutines which should switch to the running state.
      
      self.timequeue = []
      # heap of coroutines which are waiting to be activated
      # at a specific time.
      # Entry format: (time, coroutine)
      
      self.event = False
      # Indicates that a state transition should happen.
      
   def add_coroutine(self, coroutine):
      self.suspended.add(coroutine)
      self.activate(coroutine)

   def suspend(self, coroutine):
      self.event = True
      self.waiting_suspend.append(coroutine)
   
   def activate(self, coroutine):
      self.event = True
      self.waiting_activate.append(coroutine)

   def sleep(self, coroutine, sleeptime):
      """
      Sleep for 'sleeptime'.
      """
      item = (time() + sleeptime, coroutine)
      heappush(self.timequeue, item)
      self.suspend(coroutine)
      
   def runat(self, coroutine, runtime):
      """
      Run at a specific time.
      """
      self.sleep(coroutine, sleeptime = runtime - time())
      
   def handle_events(self):
      """
      Handle state transitions from suspended to running
      and vice versa.
      """
      self.event = False
            
      if self.waiting_suspend:
         for c in self.waiting_suspend:
            self.suspended.append(c)

         for c in self.waiting_suspend:
            index = self.running_coroutines_map[c]
            self.remove_coroutine_from_running_list(index)

         self.waiting_suspend = []

      if self.waiting_activate:
         for c in self.waiting_activate:
            self.suspended.remove(c)
            self.running_coroutines_map[c] = len(self.running)
            self.running.append(c.iterator.next)
            self.running_coroutines.append(c)
         self.waiting_activate = []
         
   def remove_coroutine_from_running_list(self, index):
      c = self.running_coroutines[index]
      del self.running[self.index]
      del self.running_coroutines[index]
      del self.running_coroutines_map[c]
      
   def pop_timequeue(self):
      waittime, coroutine = heappop(self.timequeue)
      self.activate(coroutine)
      
   def run(self):
      running = self.running
      running_coroutines = self.running_coroutines
      timequeue = self.timequeue
      
      self.event = True
      while running or timequeue or self.event:
         self.handle_events()

         while running:
            try:
               for self.index, next in enumerate(running):
                  next()
            except StopIteration:
               self.remove_coroutine_from_running_list(self.index)

            while timequeue and timequeue[0][0] <= time():
               self.pop_timequeue()

            if self.event:
               self.handle_events()

         if timequeue:
            sleeptime = timequeue[0][0] - time()
            sleep(sleeptime)
            self.pop_timequeue()

   def current_coroutine(self):
      return self.running_coroutines[self.index]

class semaphore(object):
   """
   Implements a binary semaphore.
   """
   def __init__(self):
      self.free = True
      self.waiting = []
      
   def acquire(self, coroutine):
      if self.free:
         self.free = False
      else:
         coroutine.suspend()
         self.waiting.append(coroutine)

   def release(self):
      if self.waiting:
         coroutine = self.waiting.pop(0)
         coroutine.activate()
      else:
         self.free = True

class monitor(object):
   def __init__(self):
      self.sem = semaphore()

   def run_protected(self, coroutine, function, *args, **kw):
      """
      Call 'function' of 'coroutine'. 'function' is assumed to be a generator function.
      at most one function call of this kind may can be running at any time.
      """
      self.sem.acquire(coroutine)
      try:
         for i in function(*args, **kw):
            pass
         return i
      finally:
         self.sem.release()

current_scheduler = scheduler()
   
def current_coroutine():
   return current_scheduler.current_coroutine()

      
if __name__ == '__main__':
   running = True
   def ack(m, n):
      if not running:
         return
      if m == 0:
         yield n + 1
         return
      if m > 0 and n == 0:
         for i in ack(m-1, 1):
            yield None
         yield i
         return
      if m > 0 and n > 0:
         for i in ack(m, n-1):
            yield None
         t = i
         for i in ack(m-1, t):
            yield None
         yield i
         return

   def a(a1, a2):
      def p(a1, a2):
         print "Ackermann",
         yield None
         print a1,
         yield None
         print a2,
         yield None
         print "=",
         yield None,
         print i
      for i in ack(a1, a2):
         yield None
      print_monitor.run_protected(current_coroutine(), p, a1, a2)

   def watchdog():
      global running
      current_coroutine().sleep(600)
      running = False
      yield None
      
   print_monitor = monitor()
   count = 0
   for i in range(100):
      for j in range(10):
         count += 1
         cr1 = coroutine(current_scheduler, a, 1, i)
         current_scheduler.add_coroutine(cr1)
   for i in range(100):
      for j in range(10):
         count += 1
         cr1 = coroutine(current_scheduler, a, i, j)
         current_scheduler.add_coroutine(cr1)
   current_scheduler.add_coroutine(coroutine(current_scheduler, watchdog))
   print "Starting the run of ", count, "coroutines"
   current_scheduler.run()

The recipe is not minimal: Modula2, for example, only provides the "transfer" procedure to explicitly transfer control from one coroutine to another. Instead, I implement a round robin scheduler which can run many coroutines at the same time. It also shows how easy it is to implement some traditional constructs for concurrency: semaphores and monitors. The non-premptive nature of coroutines, however, greatly reduces the necessity of such synchronization constructs.

As an illustration, the __main__ part of the module starts 2000 coroutines which compute the values of a highly recursive function in a quasi-parallel fashion(1 GB windows machine). The whole computation stops after 10 minutes.

Why would you choose coroutines over, say, threads?

  • You gain complete control over scheduling.

  • Since this implementation does not reserve stack space for a coroutines, you might be able to run more coroutines than threads.

3 comments

David Mertz 19 years, 7 months ago  # | flag

NOT coroutines! What is implemented here is not coroutines, but rather what I called "weightless threads" in my article:

http://www-106.ibm.com/developerworks/library/l-pythrd.html

However, you can implement SEMI-coroutines using a slightly different technique; I described this in:

http://www-106.ibm.com/developerworks/library/l-pygen.html#h3

The examples in both of these articles are quite a lot shorter than in this recipe, while being just as powerful (if not more so).

Bernhard Mulder (author) 19 years, 7 months ago  # | flag

I have expanded the description and the discussion of this recipe. I hope this leaves no doubt what this code is and what it does.

gabriele renzi 19 years, 7 months ago  # | flag

still not right, imo. I think that as of now this recipes still is not talking about coroutines.

Coroutines, in my understanding, are just explainable like "functions that save control state between calls and can be called many times".

There is not point at all in having a scheduler, cause that does not relates with the concept of coroutines.

The only implementations of coroutines I can think of in python are available in Stackless, at least until GvR does not give us callcc :)

Created by Bernhard Mulder on Tue, 17 Aug 2004 (PSF)
Python recipes (4591)
Bernhard Mulder's recipes (7)

Required Modules

Other Information and Tasks