This code is part of HTTP Replicator (http://sourceforge.net/projects/http-replicator), a proxy server that replicates remote directory structures. The fiber module is an I/O scheduler similar to python's built-in asyncore framework, but based on generators it allows for much more freedom in choosing swich points and wait states.
| import sys, select, time, socket, traceback
class SEND:
def __init__( self, sock, timeout ):
self.fileno = sock.fileno()
self.expire = time.time() + timeout
def __str__( self ):
return 'SEND(%i,%s)' % ( self.fileno, time.strftime( '%H:%M:%S', time.localtime( self.expire ) ) )
class RECV:
def __init__( self, sock, timeout ):
self.fileno = sock.fileno()
self.expire = time.time() + timeout
def __str__( self ):
return 'RECV(%i,%s)' % ( self.fileno, time.strftime( '%H:%M:%S', time.localtime( self.expire ) ) )
class WAIT:
def __init__( self, timeout = None ):
self.expire = timeout and time.time() + timeout or None
def __str__( self ):
return 'WAIT(%s)' % ( self.expire and time.strftime( '%H:%M:%S', time.localtime( self.expire ) ) )
class Fiber:
def __init__( self, generator ):
self.__generator = generator
self.state = WAIT()
def step( self, throw=None ):
self.state = None
try:
if throw:
assert hasattr( self.__generator, 'throw' ), throw
self.__generator.throw( AssertionError, throw )
state = self.__generator.next()
assert isinstance( state, (SEND, RECV, WAIT) ), 'invalid waiting state %r' % state
self.state = state
except KeyboardInterrupt:
raise
except StopIteration:
del self.__generator
pass
except AssertionError, msg:
print 'Error:', msg
except:
traceback.print_exc()
def __repr__( self ):
return '%i: %s' % ( self.__generator.gi_frame.f_lineno, self.state )
class GatherFiber( Fiber ):
def __init__( self, generator ):
Fiber.__init__( self, generator )
self.__chunks = [ '[ 0.00 ] %s\n' % time.ctime() ]
self.__start = time.time()
self.__newline = True
def step( self, throw=None ):
stdout = sys.stdout
stderr = sys.stderr
try:
sys.stdout = sys.stderr = self
Fiber.step( self, throw )
finally:
sys.stdout = stdout
sys.stderr = stderr
def write( self, string ):
if self.__newline:
self.__chunks.append( '%6.2f ' % ( time.time() - self.__start ) )
self.__chunks.append( string )
self.__newline = string.endswith( '\n' )
def __del__( self ):
sys.stdout.writelines( self.__chunks )
if not self.__newline:
sys.stdout.write( '\n' )
class DebugFiber( Fiber ):
id = 0
def __init__( self, generator ):
Fiber.__init__( self, generator )
self.__id = DebugFiber.id
sys.stdout.write( '[ %04X ] %s\n' % ( self.__id, time.ctime() ) )
self.__newline = True
self.__stdout = sys.stdout
DebugFiber.id = ( self.id + 1 ) % 65535
def step( self, throw=None ):
stdout = sys.stdout
stderr = sys.stderr
try:
sys.stdout = sys.stderr = self
Fiber.step( self, throw )
if self.state:
print 'Waiting at', self
finally:
sys.stdout = stdout
sys.stderr = stderr
def write( self, string ):
if self.__newline:
self.__stdout.write( ' %04X ' % self.__id )
self.__stdout.write( string )
self.__newline = string.endswith( '\n' )
def spawn( generator, port, debug ):
try:
listener = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
listener.setblocking( 0 )
listener.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, listener.getsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR ) | 1 )
listener.bind( ( '', port ) )
listener.listen( 5 )
except Exception, e:
print 'error: failed to create socket:', e
return False
if debug:
myFiber = DebugFiber
else:
myFiber = GatherFiber
print ' .... Server started'
try:
fibers = []
while True:
tryrecv = { listener.fileno(): None }
trysend = {}
expire = None
now = time.time()
i = len( fibers )
while i:
i -= 1
state = fibers[ i ].state
if state and now > state.expire:
if isinstance( state, WAIT ):
fibers[ i ].step()
else:
fibers[ i ].step( throw='connection timed out' )
state = fibers[ i ].state
if not state:
del fibers[ i ]
continue
if isinstance( state, RECV ):
tryrecv[ state.fileno ] = fibers[ i ]
elif isinstance( state, SEND ):
trysend[ state.fileno ] = fibers[ i ]
elif state.expire is None:
continue
if state.expire < expire or expire is None:
expire = state.expire
if expire is None:
print '[ IDLE ]', time.ctime()
sys.stdout.flush()
canrecv, cansend, dummy = select.select( tryrecv, trysend, [] )
print '[ BUSY ]', time.ctime()
sys.stdout.flush()
else:
canrecv, cansend, dummy = select.select( tryrecv, trysend, [], max( expire - now, 0 ) )
for fileno in canrecv:
if fileno is listener.fileno():
fibers.append( myFiber( generator( *listener.accept() ) ) )
else:
tryrecv[ fileno ].step()
for fileno in cansend:
trysend[ fileno ].step()
except KeyboardInterrupt:
print ' .... Server terminated'
return True
except:
print ' .... Server crashed'
traceback.print_exc( file=sys.stdout )
return False
|
To talk on an arbitrary number of sockets simultaneously, one approach is to separate distinct transactions in fibers. Fibers can be though of as threads, except that they lack the overhead of real posix threads. Quoted from wikipedia (http://en.wikipedia.org/wiki/Thread_%28computer_science%29):
"Typically fibers are implemented entirely in userspace. As a result, context switching between fibers in a process does not require any interaction with the kernel at all and is therefore extremely efficient: a context switch can be performed by locally saving the CPU registers used by the currently executing fiber and loading the registers required by the fiber to be executed. Since scheduling occurs in userspace, the scheduling policy can be more easily tailored to the requirements of the program's workload."
The last point can be considered an advantage over real threads, which have no control over place or time of switching and therefore require special care of being thread-safe. Fibers, on the other hand, switch at fixed points in code, which makes thread-safety considerably easier to achieve. The downside of fixed switching points is the risk of blocking calls in between, stalling the entire application. As the article continues:
"However, the use of blocking system calls in fibers can be problematic. If a fiber performs a system call that blocks, the other fibers in the process are unable to run until the system call returns. A typical example of this problem is when performing I/O: most programs are written to perform I/O synchronously. When an I/O operation is initiated, a system call is made, and does not return until the I/O operation has been completed. In the intervening period, the entire process is "blocked" by the kernel and cannot run, which starves other fibers in the same process from executing."
The above problem is a consequence of 'co-operative scheduling', which means that "a running fiber must explicitly 'yield' to allow another fiber to run". In practice - or at least, replicator's practice, one needs only assure that fibers yield right before every I/O operation, and are resumed only after this operation is guaranteed not to block. To this end, fibers communicate the current state of communication before yielding to the scheduler. Three states are supported:
- RECV: fiber is awaiting data
- SEND: fiber is queueing data
- WAIT: fiber is sleeping
Arguments to the first two states are a socket and timeout, which the schedular adds to the total pool of 'select' monitored sockets. The third state will either wake the fiber after a certain time (WAIT #seconds), wake together with the first other fiber to resume operation (WAIT None), or force a queue run (WAIT 0). The latter is useful in case heavy processing risks stalling the server even without blocking system calls. Note again that the responsibility of yielding in time is with the programmer.
As of version 2.2 the Python language includes on object that can very easily be turned into the fiber just described: the generator (http://www.python.org/doc/2.2.3/whatsnew/node5.html). Complete with a yield statement to both put the fiber on hold and communitate the state to the server, this is precisely the kind of "resumable function" we are looking for. Other implementations of this idea speak of weightless threads (http://www.ibm.com/developerworks/library/l-pythrd.html), coroutines (http://o2s.csail.mit.edu/o2s-wiki/multitask), microthreads, tasklets, etc. Also notable in this respect is Stackless Python (http://www.stackless.com), introducing true microthreads in the language itself, and stackless in PyPy (http://codespeak.net/pypy/dist/pypy/doc/stackless.html). And, lastly, there is the asyncore module (http://docs.python.org/lib/module-asyncore.html) that is very similar to fibers except for its rigid framework. Indeed, the previous versions of replicator were based on this.
A Python fiber terminates when either it returns or raises an exception. The scheduler takes care that exceptions are handled without interfering with other fibers and prints an error message or, optionally, a complete traceback message. As for printing, the default behaviour is to hold all output until the fiber terminates in order to have a better view of per-fiber operation. In debug mode this is changed to direct output. Fibers need not worry about this; it is all handled by the scheduler which replaces sys.stdout with the configured printer upon fiber activation. This to prevent tedius passing around of various log objects.
Remains to explain how the system is put in action. This is handled by the spawn method, which opens a socket at the specified localhost port and starts the main loop listening for incoming connections. For every such connection it creates an instance of the specified generator function. The main loop is exited by sending SIGHUP or ^C, in either case raising a KeyboardInterrupt. Finally, note that a valid generator:
- takes two arguments: 1) the connecting socket and 2) its address
- yields only objects of type WAIT, SEND, RECV
Other than that there are no restrictions. For an example application of this framework see HTTP Replicator (http://sourceforge.net/projects/http-replicator).
Useful, but.. Is this really a recipe? Looks like an application-size piece of code to me...
recipe changed. Not anymore, I think. I changed the recipe to Replicator's new engine, which I indend to be a general replacement of python's builtin asyncore module.