import cStringIO
import itertools
import Queue
import subprocess
import threading
class Transporter(object):
def __init__(self, get, getSentinel, put, putSentinel):
self._thread = threading.Thread(target=self.loop, args=(get, getSentinel, put, putSentinel))
self._thread.daemon = True
self._thread.start()
@classmethod
def loop(cls, get, getSentinel, put, putSentinel):
for item in iter(get, getSentinel):
put(item)
put(putSentinel)
def join(self, timeout=None):
self._thread.join(timeout)
class Communicator(object):
def __init__(self, write, writeSentinel, read, readSentinel):
self._out = Queue.Queue()
self._write = write
self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
self._writeSentinel = writeSentinel
self._in = {}
self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
self._readSentinel = readSentinel
self._closing = False
self._lock = threading.RLock()
self._next = itertools.count().next
def __del__(self):
self.close()
def put(self, command, request=True):
with self._lock:
deferredResult = Queue.Queue() if request else None
if not self._closing:
self._out.put((deferredResult, command))
elif deferredResult:
deferredResult.put(StopIteration())
return deferredResult
def close(self, timeout=None):
self._shutdown()
self._writer.join(timeout)
self._reader.join(timeout)
def _notify(self, requestId, result):
self._in.pop(requestId).put(result)
def _onSend(self, item):
(deferredResult, command) = item
requestId = None
if deferredResult:
requestId = self._next()
self._in[requestId] = deferredResult
try:
self._write(requestId, command)
except StopIteration as e:
self._shutdown()
if deferredResult:
self._notify(requestId, e)
except Exception as e:
if deferredResult:
self._notify(requestId, e)
def _onReceive(self, item):
if item == self._readSentinel:
frame = item
self._shutdown()
self._writer.join()
requestIds = list(self._in)
else:
(requestId, frame) = item
requestIds = [requestId]
for requestId in requestIds:
self._notify(requestId, frame)
def _shutdown(self):
with self._lock:
if self._closing:
return
self.put(self._writeSentinel, request=False)
self._closing = True
class Writer(object):
SENTINEL = None
def __init__(self, stream):
self._stream = stream
def __call__(self, requestId, command):
frame = self._compile(requestId, command)
if frame is not None:
try:
self._stream.write(frame)
self._stream.flush()
except:
command = self.SENTINEL
if command == self.SENTINEL:
try:
self._stream.close()
finally:
raise StopIteration()
def _compile(self, requestId, command):
raise NotImplementedError()
class Reader(object):
SENTINEL = None
FRAME_SEPARATOR = '\x00'
def __init__(self, stream):
self._stream = stream
self.__read = self._read().next
def __call__(self):
return self.__read()
def _next(self):
try:
return self._stream.read(1)
except:
return ''
def _parse(self, frame):
raise NotImplementedError()
def _read(self):
self._reset()
for character in iter(self._next, ''):
if character != self.FRAME_SEPARATOR:
self._buffer.write(character)
continue
frame = self._buffer.getvalue()
self._reset()
yield self._parse(frame)
try:
self._stream.close()
finally:
yield self.SENTINEL
def _reset(self):
self._buffer = cStringIO.StringIO()
class SubprocessSession(object):
def __init__(self, command, writer, reader, start=True, **kwargs):
self._command = command
self._writer = writer
self._reader = reader
self._kwargs = {'stdin': subprocess.PIPE, 'stdout': subprocess.PIPE}
self._kwargs.update(kwargs)
self._lock = threading.RLock()
if start:
self.start()
def __del__(self):
self.close()
def put(self, frame, request=True):
return self._communicator.put(frame, request)
def start(self):
with self._lock:
if hasattr(self, '_process'):
return
self._process = subprocess.Popen(self._command, **self._kwargs)
self._communicator = Communicator(
self._writer(self._process.stdin), self._writer.SENTINEL,
self._reader(self._process.stdout), self._reader.SENTINEL
)
def close(self, timeout=None):
with self._lock:
if not hasattr(self, '_process'):
return
try:
self._communicator.close(timeout)
finally:
if self._process.poll() is None:
self._process.kill()
del self._process
del self._communicator
# I use the part above as a generic utility module. A simple echo example follows:
class EchoWriter(Writer):
def _compile(self, requestId, command):
if command == self.SENTINEL:
return
return command if (requestId is None) else ('%s:%s\x00' % (requestId, command))
class EchoReader(Reader):
def _parse(self, frame):
requestId, frame = frame.split(':', 1)
return (int(requestId), frame)
def run():
from multiprocessing.pool import ThreadPool
session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)
pool = ThreadPool(50)
requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
results = pool.map(lambda r: r.get(), requests)
print results == ['message %d' % j for j in xrange(2000)]
if __name__ == '__main__':
run()