#!/usr/bin/env python
"""asyncpipes.py: Asynchronous pipe communication using asyncore.
Extends `asyncore.file_dispatcher` to provide extra functionality for reading
from and writing to pipes. Uses the observer pattern to provide notification
of new data and closed pipes.
References:
http://code.activestate.com/recipes/576962/ [observer.py]
http://parijatmishra.blogspot.com/2008/01/writing-server-with-pythons-asyncore.html
"""
import os
from sys import stderr
from errno import EPIPE, EBADF
from asyncore import file_dispatcher
from traceback import print_exc
from observer import Observable
if __name__ == '__main__':
import optparse
from asyncore import loop
__version__ = '$Revision: 3742 $'.split()[1]
__usage__ = 'usage: %prog [options]'
class PipeDispatcher(Observable, file_dispatcher):
"""Dispatch pipe I/O using asyncore.
Allows synchronous access to the pipe by delegating to the filehandle,
though synchronous and asynchronous access should probably not be mixed.
"""
# Event sent when the pipe is closed
PIPE_CLOSED = 'closed'
# Default value for maximum pipe data
pipe_maxdata = 512
def __init__(self, fh, map=None, maxdata=None, ignore_broken_pipe=False, logger=None, **obsopt):
"""Wrap a dispatcher around the passed filehandle.
If `ignore_broken_pipe` is `True`, an `EPIPE` or `EBADF` error will
call `handle_close()` instead of `handle_expt()`. Useful when broken
pipes should be handled quietly.
`logger` is a logger which will be used to log unusual exceptions;
otherwise, they will be printed to stderr.
"""
self.maxdata = maxdata if maxdata else self.pipe_maxdata
self.__logger = logger
if ignore_broken_pipe:
self.__ignore_errno = [EPIPE, EBADF]
else:
self.__ignore_errno = []
self.__filehandle = fh
# Check for overduplication of the file descriptor and close the extra
fddup = os.dup(fh.fileno())
file_dispatcher.__init__(self, fddup, map=map)
if (self._fileno != fddup): os.close (fddup)
Observable.__init__(self, **obsopt)
def __getattr__(self, attr):
"""Delegate to the filehandle."""
return getattr(self.__filehandle, attr)
def close(self):
"""Close the pipe and calls the _obs_notify() method."""
if self.__filehandle:
try:
try:
file_dispatcher.close(self)
except OSError, oe:
if oe.errno not in self.__ignore_errno:
if self.__logger: self.__logger.exception("Unusual error closing pipe dispatcher")
else: print_exc(file=stderr)
try:
self.__filehandle.close()
except OSError, oe:
if oe.errno not in self.__ignore_errno:
if self.__logger: self.__logger.exception("Unusual error closing pipe filehandle")
else: print_exc(file=stderr)
finally:
self.__filehandle = None
self._obs_notify(event=self.PIPE_CLOSED)
def readable(self):
"""Return `True` if the pipe is still open."""
return (self.__filehandle is not None)
def writable(self):
"""Return `True` if the pipe is still open."""
return (self.__filehandle is not None)
def send(self, buffer):
"""Check for closed and broken pipes when sending data."""
if self.__filehandle:
try:
return file_dispatcher.send(self, buffer)
except OSError, oe:
if oe.errno in self.__ignore_errno: self.handle_close()
else: self.handle_expt()
return 0
def recv(self, buffer_size):
"""Check for closed and broken pipes when receiving data."""
if self.__filehandle:
try:
return file_dispatcher.recv(self, buffer_size)
except OSError, oe:
if oe.errno in self.__ignore_errno: self.handle_close()
else: self.handle_expt()
return ''
def handle_close(self):
"""Call `self.close()` to close the pipe."""
self.close()
def handle_expt(self):
"""Print a traceback and call `handle_close()` to close the pipe."""
if self.__logger: self.__logger.exception("Unusual exception in pipe I/O")
else: print_exc(file=stderr)
self.handle_close()
def _obs_exception(self):
"""Handle an exception raised by an observer."""
if self.__logger: self.__logger.exception("Unusual exception in pipe observer")
else: print_exc(file=stderr)
class InputPipeDispatcher(PipeDispatcher):
"""Push data to an input pipe using asyncore."""
def __init__(self, fh, close_when_done=False, **keywmap):
"""Wrap a dispatcher around the passed input filehandle.
`close_when_done` closes the pipe as soon as the buffer is empty after
the first `push_data()`. Useful for communicating with subprocesses
that read stdin to EOF before proceeding.
"""
self.__buffer = None
self.__offset = 0
self.__close_when_done = close_when_done
PipeDispatcher.__init__(self, fh, **keywmap)
def readable(self):
"""Return `False`; input pipes are never readable."""
return False
def writable(self):
"""Return `True` if data is in the buffer and the pipe is open."""
return PipeDispatcher.writable(self) and (self.__buffer is not None)
def handle_write(self):
"""Write up to `maxdata` bytes to the pipe."""
if self.writable():
self.__offset += self.send(
self.__buffer[self.__offset:self.__offset+self.maxdata])
# If the buffer is all written, empty it.
if self.__offset >= len(self.__buffer):
self.__buffer = None
self.__offset = 0
if self.__close_when_done: self.close()
def push_data(self, data):
"""Push some data by putting it in the write buffer.
Raise `EOFError` if the pipe is already closed.
"""
if not PipeDispatcher.writable(self):
raise EOFError('Input pipe closed.')
elif self.__buffer:
# Since we have to construct a new string, remove the already-sent data.
self.__buffer = self.__buffer[self.__offset:] + data
else:
self.__buffer = data
self.__offset = 0
class OutputPipeDispatcher(PipeDispatcher):
"""Get data from an output pipe using asyncore."""
PIPE_DATA = 'data'
"""Event sent when new data is available in the pipe."""
def __init__(self, fh, universal_newlines=False, **keywmap):
"""Wrap a dispatcher around the passed output filehandle.
`universal_newlines` converts all newlines found in the data stream to
'\n', just as in `subprocess.Popen`.
"""
self._universal_newlines = universal_newlines
self.__data = []
self.__endedcr = False
PipeDispatcher.__init__(self, fh, **keywmap)
def writable(self):
"""Return `False`; output pipes are never writable."""
return False
def handle_read(self):
"""Read and queue up to `maxdata` bytes, and notify any observers."""
if self.readable():
data = self.recv(self.maxdata)
if data:
self.__data.append(data)
self._obs_notify(self.PIPE_DATA)
def _translate_newlines(self, data):
data = data.replace("\r\n", "\n")
data = data.replace("\r", "\n")
return data
def fetch_data(self, clear=False):
"""Return all the accumulated data from the pipe as a string.
If `clear` is `True`, clear the accumulated data.
"""
if self.__data:
datastr = ''.join(self.__data)
if clear:
self.__data[:] = []
if datastr and self._universal_newlines:
# Take care of a newline split across cleared reads.
stripnl = self.__endedcr
if clear:
self.__endedcr = (datastr[-1] == '\r')
if stripnl and datastr[0] == '\n':
return self._translate_newlines(datastr[1:])
else:
return self._translate_newlines(datastr)
else:
return datastr
else:
return ''
def readlines(self, clear=False):
"""Return all complete lines from the pipe as a list of strings.
If `clear` is `True`, clear the accumulated data, but leave any
incomplete line
"""
datastr = self.fetch_data(clear)
lines = datastr.splitlines(True)
if lines[-1][-1] != '\n':
if clear:
self.__data[:] = [ lines[-1] ]
return lines[0:-1]
return lines
if __name__ == '__main__':
class TestAsyncPipe:
def __init__(self, maxprint, lineterm, loops, maxwrite, maxread):
self._maxprint = maxprint
self._lineterm = lineterm
self._loops = loops
rp, wp = os.pipe()
self._inpipe = InputPipeDispatcher(os.fdopen(wp, 'wb'),
maxdata=maxwrite)
self._outpipe = OutputPipeDispatcher(os.fdopen(rp, 'rb'),
maxdata=maxread, universal_newlines=(lineterm != '\n'))
self._inpipe.obs_add(self)
self._outpipe.obs_add(self)
def _printdata(self, data):
if not data:
printable = ''
elif len(data) > self._maxprint + 1:
printable = ': %r' % ('%s...%s' % (data[:self._maxprint], data[-1]))
else:
printable = ': %r' % data
print '%d bytes received%s' % (len(data), printable)
def handle_notify(self, pipe, event):
if event == OutputPipeDispatcher.PIPE_DATA:
data = pipe.fetch_data(clear=False)
self._printdata(data)
if data.endswith('\n'):
self._loops -= 1
if self._loops:
data = pipe.fetch_data(clear=True).strip()
self._inpipe.push_data(data + self._lineterm)
else:
self._inpipe.close()
self._outpipe.close()
else:
print '%s %s' % (pipe.__class__, event)
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--data', default='0123456789',
help='Data string to be sent [%default]')
optparser.add_option('--copies', type=int, metavar='N', default=1,
help='Repeat the data N times (to test large transfers) [%default]')
optparser.add_option('--maxread', type='int', metavar='BYTES', default=1024,
help='Maximum data to read in each chunk [%default]')
optparser.add_option('--maxwrite', type='int', metavar='BYTES', default=1024,
help='Maximum data to write in each chunk [%default]')
optparser.add_option('--loops', type='int', metavar='N', default=5,
help='Number of loops to execute [%default]')
optparser.add_option('--lineterm', type='choice', metavar='TERM', choices=['CR','CRLF','LF'],
default='LF', help='Line terminator to send: CR, CRLF, or LF [%default]')
(options, args) = optparser.parse_args()
# Translate the line terminator to an escape sequence.
lineterm = {'CR':'\r', 'CRLF':'\r\n', 'LF':'\n'}[options.lineterm]
pipe_handler = TestAsyncPipe(len(options.data), lineterm, options.loops, options.maxwrite, options.maxread)
pipe_handler._inpipe.push_data(options.data * options.copies + lineterm)
loop()
Diff to Previous Revision
--- revision 8 2012-12-06 19:29:33
+++ revision 9 2013-10-29 16:48:22
@@ -23,7 +23,7 @@
import optparse
from asyncore import loop
-__version__ = '$Revision: 3245 $'.split()[1]
+__version__ = '$Revision: 3742 $'.split()[1]
__usage__ = 'usage: %prog [options]'
@@ -234,11 +234,24 @@
else:
return ''
+ def readlines(self, clear=False):
+ """Return all complete lines from the pipe as a list of strings.
+
+ If `clear` is `True`, clear the accumulated data, but leave any
+ incomplete line
+ """
+ datastr = self.fetch_data(clear)
+ lines = datastr.splitlines(True)
+ if lines[-1][-1] != '\n':
+ if clear:
+ self.__data[:] = [ lines[-1] ]
+ return lines[0:-1]
+ return lines
+
if __name__ == '__main__':
class TestAsyncPipe:
- def __init__(self, maxprint,
- loops=5, maxwrite=1024, maxread=1024, lineterm='\n'):
+ def __init__(self, maxprint, lineterm, loops, maxwrite, maxread):
self._maxprint = maxprint
self._lineterm = lineterm
self._loops = loops
@@ -290,12 +303,9 @@
optparser.add_option('--lineterm', type='choice', metavar='TERM', choices=['CR','CRLF','LF'],
default='LF', help='Line terminator to send: CR, CRLF, or LF [%default]')
(options, args) = optparser.parse_args()
- # Return options as dictionary.
- optdict = lambda *args: dict([(k, getattr(options, k)) for k in args])
# Translate the line terminator to an escape sequence.
lineterm = {'CR':'\r', 'CRLF':'\r\n', 'LF':'\n'}[options.lineterm]
- pipe_handler = TestAsyncPipe(len(options.data), lineterm=lineterm,
- **optdict('loops','maxwrite','maxread'))
+ pipe_handler = TestAsyncPipe(len(options.data), lineterm, options.loops, options.maxwrite, options.maxread)
pipe_handler._inpipe.push_data(options.data * options.copies + lineterm)
loop()