Welcome, guest | Sign In | My Account | Store | Cart
"""
A coroutine implementation in Python.

Convert functions to iterator generators by replacing

return <expression>

with

yield <expression>

and function calls: f(*args, **kw) with for loops:

for i in f(*args, **kw):
   pass
# the function value is in i now.

Unfortunately, you have to know which routine has been converted to
an iterator generator and which have not :-(.
"""
from sets import Set
from time import time, sleep
from heapq import heappush, heappop

class coroutine(object):
   def __init__(self, scheduler, function, *args, **kw):
      self.iterator = function(*args, **kw)
      self.scheduler = scheduler
      
   def suspend(self):
      self.scheduler.suspend(self)
      yield None

   def activate(self):
      self.scheduler.activate(self)
      yield None
      
   def sleep(self, sleeptime):
      self.scheduler.sleep(self, sleeptime)
      yield None
      
class scheduler(object):
   """
   Scheduler which runs coroutines in a round robin fashion.
   No priority scheme is implemented.
   """
   def __init__(self):
      self.running = []
      self.running_coroutines = []
      self.running_coroutines_map = {}
      # This data describes the coroutines which are running.
      # self.running contains the next routines to be called.
      # self.running_coroutine maps coroutines to the position
      # in self.running_coroutines

      self.suspended = Set()
      # Coroutines which are not running.
      
      self.waiting_suspend = []
      # Coroutines which are to transition from running to suspended
      
      self.waiting_running = []
      # Coroutines which are to start running
      
      self.waiting_activate = []
      # Coroutines which should switch to the running state.
      
      self.timequeue = []
      # heap of coroutines which are waiting to be activated
      # at a specific time.
      # Entry format: (time, coroutine)
      
      self.event = False
      # Indicates that a state transition should happen.
      
   def add_coroutine(self, coroutine):
      self.suspended.add(coroutine)
      self.activate(coroutine)

   def suspend(self, coroutine):
      self.event = True
      self.waiting_suspend.append(coroutine)
   
   def activate(self, coroutine):
      self.event = True
      self.waiting_activate.append(coroutine)

   def sleep(self, coroutine, sleeptime):
      """
      Sleep for 'sleeptime'.
      """
      item = (time() + sleeptime, coroutine)
      heappush(self.timequeue, item)
      self.suspend(coroutine)
      
   def runat(self, coroutine, runtime):
      """
      Run at a specific time.
      """
      self.sleep(coroutine, sleeptime = runtime - time())
      
   def handle_events(self):
      """
      Handle state transitions from suspended to running
      and vice versa.
      """
      self.event = False
            
      if self.waiting_suspend:
         for c in self.waiting_suspend:
            self.suspended.append(c)

         for c in self.waiting_suspend:
            index = self.running_coroutines_map[c]
            self.remove_coroutine_from_running_list(index)

         self.waiting_suspend = []

      if self.waiting_activate:
         for c in self.waiting_activate:
            self.suspended.remove(c)
            self.running_coroutines_map[c] = len(self.running)
            self.running.append(c.iterator.next)
            self.running_coroutines.append(c)
         self.waiting_activate = []
         
   def remove_coroutine_from_running_list(self, index):
      c = self.running_coroutines[index]
      del self.running[self.index]
      del self.running_coroutines[index]
      del self.running_coroutines_map[c]
      
   def pop_timequeue(self):
      waittime, coroutine = heappop(self.timequeue)
      self.activate(coroutine)
      
   def run(self):
      running = self.running
      running_coroutines = self.running_coroutines
      timequeue = self.timequeue
      
      self.event = True
      while running or timequeue or self.event:
         self.handle_events()

         while running:
            try:
               for self.index, next in enumerate(running):
                  next()
            except StopIteration:
               self.remove_coroutine_from_running_list(self.index)

            while timequeue and timequeue[0][0] <= time():
               self.pop_timequeue()

            if self.event:
               self.handle_events()

         if timequeue:
            sleeptime = timequeue[0][0] - time()
            sleep(sleeptime)
            self.pop_timequeue()

   def current_coroutine(self):
      return self.running_coroutines[self.index]

class semaphore(object):
   """
   Implements a binary semaphore.
   """
   def __init__(self):
      self.free = True
      self.waiting = []
      
   def acquire(self, coroutine):
      if self.free:
         self.free = False
      else:
         coroutine.suspend()
         self.waiting.append(coroutine)

   def release(self):
      if self.waiting:
         coroutine = self.waiting.pop(0)
         coroutine.activate()
      else:
         self.free = True

class monitor(object):
   def __init__(self):
      self.sem = semaphore()

   def run_protected(self, coroutine, function, *args, **kw):
      """
      Call 'function' of 'coroutine'. 'function' is assumed to be a generator function.
      at most one function call of this kind may can be running at any time.
      """
      self.sem.acquire(coroutine)
      try:
         for i in function(*args, **kw):
            pass
         return i
      finally:
         self.sem.release()

current_scheduler = scheduler()
   
def current_coroutine():
   return current_scheduler.current_coroutine()

      
if __name__ == '__main__':
   running = True
   def ack(m, n):
      if not running:
         return
      if m == 0:
         yield n + 1
         return
      if m > 0 and n == 0:
         for i in ack(m-1, 1):
            yield None
         yield i
         return
      if m > 0 and n > 0:
         for i in ack(m, n-1):
            yield None
         t = i
         for i in ack(m-1, t):
            yield None
         yield i
         return

   def a(a1, a2):
      def p(a1, a2):
         print "Ackermann",
         yield None
         print a1,
         yield None
         print a2,
         yield None
         print "=",
         yield None,
         print i
      for i in ack(a1, a2):
         yield None
      print_monitor.run_protected(current_coroutine(), p, a1, a2)

   def watchdog():
      global running
      current_coroutine().sleep(600)
      running = False
      yield None
      
   print_monitor = monitor()
   count = 0
   for i in range(100):
      for j in range(10):
         count += 1
         cr1 = coroutine(current_scheduler, a, 1, i)
         current_scheduler.add_coroutine(cr1)
   for i in range(100):
      for j in range(10):
         count += 1
         cr1 = coroutine(current_scheduler, a, i, j)
         current_scheduler.add_coroutine(cr1)
   current_scheduler.add_coroutine(coroutine(current_scheduler, watchdog))
   print "Starting the run of ", count, "coroutines"
   current_scheduler.run()

History

  • revision 4 (19 years ago)
  • previous revisions are not available