Welcome, guest | Sign In | My Account | Store | Cart

This code is part of HTTP Replicator (http://sourceforge.net/projects/http-replicator), a proxy server that replicates remote directory structures. The fiber module is an I/O scheduler similar to python's built-in asyncore framework, but based on generators it allows for much more freedom in choosing swich points and wait states.

Python, 217 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import sys, select, time, socket, traceback


class SEND:

  def __init__( self, sock, timeout ):

    self.fileno = sock.fileno()
    self.expire = time.time() + timeout

  def __str__( self ):

    return 'SEND(%i,%s)' % ( self.fileno, time.strftime( '%H:%M:%S', time.localtime( self.expire ) ) )


class RECV:

  def __init__( self, sock, timeout ):

    self.fileno = sock.fileno()
    self.expire = time.time() + timeout

  def __str__( self ):

    return 'RECV(%i,%s)' % ( self.fileno, time.strftime( '%H:%M:%S', time.localtime( self.expire ) ) )


class WAIT:

  def __init__( self, timeout = None ):

    self.expire = timeout and time.time() + timeout or None

  def __str__( self ):

    return 'WAIT(%s)' % ( self.expire and time.strftime( '%H:%M:%S', time.localtime( self.expire ) ) )


class Fiber:

  def __init__( self, generator ):

    self.__generator = generator
    self.state = WAIT()

  def step( self, throw=None ):

    self.state = None
    try:
      if throw:
        assert hasattr( self.__generator, 'throw' ), throw
        self.__generator.throw( AssertionError, throw )
      state = self.__generator.next()
      assert isinstance( state, (SEND, RECV, WAIT) ), 'invalid waiting state %r' % state
      self.state = state
    except KeyboardInterrupt:
      raise
    except StopIteration:
      del self.__generator
      pass
    except AssertionError, msg:
      print 'Error:', msg
    except:
      traceback.print_exc()

  def __repr__( self ):

    return '%i: %s' % ( self.__generator.gi_frame.f_lineno, self.state )


class GatherFiber( Fiber ):

  def __init__( self, generator ):

    Fiber.__init__( self, generator )
    self.__chunks = [ '[ 0.00 ] %s\n' % time.ctime() ]
    self.__start = time.time()
    self.__newline = True

  def step( self, throw=None ):

    stdout = sys.stdout
    stderr = sys.stderr
    try:
      sys.stdout = sys.stderr = self
      Fiber.step( self, throw )
    finally:
      sys.stdout = stdout
      sys.stderr = stderr

  def write( self, string ):

    if self.__newline:
      self.__chunks.append( '%6.2f   ' % ( time.time() - self.__start ) )
    self.__chunks.append( string )
    self.__newline = string.endswith( '\n' )

  def __del__( self ):

    sys.stdout.writelines( self.__chunks )
    if not self.__newline:
      sys.stdout.write( '\n' )


class DebugFiber( Fiber ):

  id = 0

  def __init__( self, generator ):

    Fiber.__init__( self, generator )
    self.__id = DebugFiber.id
    sys.stdout.write( '[ %04X ] %s\n' % ( self.__id, time.ctime() ) )
    self.__newline = True
    self.__stdout = sys.stdout
    DebugFiber.id = ( self.id + 1 ) % 65535

  def step( self, throw=None ):

    stdout = sys.stdout
    stderr = sys.stderr
    try:
      sys.stdout = sys.stderr = self
      Fiber.step( self, throw )
      if self.state:
        print 'Waiting at', self
    finally:
      sys.stdout = stdout
      sys.stderr = stderr

  def write( self, string ):

    if self.__newline:
      self.__stdout.write( '  %04X   ' % self.__id )
    self.__stdout.write( string )
    self.__newline = string.endswith( '\n' )


def spawn( generator, port, debug ):

  try:
    listener = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
    listener.setblocking( 0 )
    listener.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, listener.getsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR ) | 1 )
    listener.bind( ( '', port ) )
    listener.listen( 5 )
  except Exception, e:
    print 'error: failed to create socket:', e
    return False

  if debug:
    myFiber = DebugFiber
  else:
    myFiber = GatherFiber

  print '  ....   Server started'
  try:

    fibers = []

    while True:

      tryrecv = { listener.fileno(): None }
      trysend = {}
      expire = None
      now = time.time()

      i = len( fibers )
      while i:
        i -= 1
        state = fibers[ i ].state

        if state and now > state.expire:
          if isinstance( state, WAIT ):
            fibers[ i ].step()
          else:
            fibers[ i ].step( throw='connection timed out' )
          state = fibers[ i ].state

        if not state:
          del fibers[ i ]
          continue

        if isinstance( state, RECV ):
          tryrecv[ state.fileno ] = fibers[ i ]
        elif isinstance( state, SEND ):
          trysend[ state.fileno ] = fibers[ i ]
        elif state.expire is None:
          continue

        if state.expire < expire or expire is None:
          expire = state.expire

      if expire is None:
        print '[ IDLE ]', time.ctime()
        sys.stdout.flush()
        canrecv, cansend, dummy = select.select( tryrecv, trysend, [] )
        print '[ BUSY ]', time.ctime()
        sys.stdout.flush()
      else:
        canrecv, cansend, dummy = select.select( tryrecv, trysend, [], max( expire - now, 0 ) )

      for fileno in canrecv:
        if fileno is listener.fileno():
          fibers.append( myFiber( generator( *listener.accept() ) ) )
        else:
          tryrecv[ fileno ].step()
      for fileno in cansend:
        trysend[ fileno ].step()

  except KeyboardInterrupt:
    print '  ....   Server terminated'
    return True
  except:
    print '  ....   Server crashed'
    traceback.print_exc( file=sys.stdout )
    return False

To talk on an arbitrary number of sockets simultaneously, one approach is to separate distinct transactions in fibers. Fibers can be though of as threads, except that they lack the overhead of real posix threads. Quoted from wikipedia (http://en.wikipedia.org/wiki/Thread_%28computer_science%29):

"Typically fibers are implemented entirely in userspace. As a result, context switching between fibers in a process does not require any interaction with the kernel at all and is therefore extremely efficient: a context switch can be performed by locally saving the CPU registers used by the currently executing fiber and loading the registers required by the fiber to be executed. Since scheduling occurs in userspace, the scheduling policy can be more easily tailored to the requirements of the program's workload."

The last point can be considered an advantage over real threads, which have no control over place or time of switching and therefore require special care of being thread-safe. Fibers, on the other hand, switch at fixed points in code, which makes thread-safety considerably easier to achieve. The downside of fixed switching points is the risk of blocking calls in between, stalling the entire application. As the article continues:

"However, the use of blocking system calls in fibers can be problematic. If a fiber performs a system call that blocks, the other fibers in the process are unable to run until the system call returns. A typical example of this problem is when performing I/O: most programs are written to perform I/O synchronously. When an I/O operation is initiated, a system call is made, and does not return until the I/O operation has been completed. In the intervening period, the entire process is "blocked" by the kernel and cannot run, which starves other fibers in the same process from executing."

The above problem is a consequence of 'co-operative scheduling', which means that "a running fiber must explicitly 'yield' to allow another fiber to run". In practice - or at least, replicator's practice, one needs only assure that fibers yield right before every I/O operation, and are resumed only after this operation is guaranteed not to block. To this end, fibers communicate the current state of communication before yielding to the scheduler. Three states are supported:

  • RECV: fiber is awaiting data
  • SEND: fiber is queueing data
  • WAIT: fiber is sleeping

Arguments to the first two states are a socket and timeout, which the schedular adds to the total pool of 'select' monitored sockets. The third state will either wake the fiber after a certain time (WAIT #seconds), wake together with the first other fiber to resume operation (WAIT None), or force a queue run (WAIT 0). The latter is useful in case heavy processing risks stalling the server even without blocking system calls. Note again that the responsibility of yielding in time is with the programmer.

As of version 2.2 the Python language includes on object that can very easily be turned into the fiber just described: the generator (http://www.python.org/doc/2.2.3/whatsnew/node5.html). Complete with a yield statement to both put the fiber on hold and communitate the state to the server, this is precisely the kind of "resumable function" we are looking for. Other implementations of this idea speak of weightless threads (http://www.ibm.com/developerworks/library/l-pythrd.html), coroutines (http://o2s.csail.mit.edu/o2s-wiki/multitask), microthreads, tasklets, etc. Also notable in this respect is Stackless Python (http://www.stackless.com), introducing true microthreads in the language itself, and stackless in PyPy (http://codespeak.net/pypy/dist/pypy/doc/stackless.html). And, lastly, there is the asyncore module (http://docs.python.org/lib/module-asyncore.html) that is very similar to fibers except for its rigid framework. Indeed, the previous versions of replicator were based on this.

A Python fiber terminates when either it returns or raises an exception. The scheduler takes care that exceptions are handled without interfering with other fibers and prints an error message or, optionally, a complete traceback message. As for printing, the default behaviour is to hold all output until the fiber terminates in order to have a better view of per-fiber operation. In debug mode this is changed to direct output. Fibers need not worry about this; it is all handled by the scheduler which replaces sys.stdout with the configured printer upon fiber activation. This to prevent tedius passing around of various log objects.

Remains to explain how the system is put in action. This is handled by the spawn method, which opens a socket at the specified localhost port and starts the main loop listening for incoming connections. For every such connection it creates an instance of the specified generator function. The main loop is exited by sending SIGHUP or ^C, in either case raising a KeyboardInterrupt. Finally, note that a valid generator:

  • takes two arguments: 1) the connecting socket and 2) its address
  • yields only objects of type WAIT, SEND, RECV

Other than that there are no restrictions. For an example application of this framework see HTTP Replicator (http://sourceforge.net/projects/http-replicator).

2 comments

S W 17 years ago  # | flag

Useful, but.. Is this really a recipe? Looks like an application-size piece of code to me...

Gertjan (author) 13 years, 11 months ago  # | flag

recipe changed. Not anymore, I think. I changed the recipe to Replicator's new engine, which I indend to be a general replacement of python's builtin asyncore module.