import cStringIO
import itertools
import Queue
import subprocess
import threading
class Transporter(object):
def __init__(self, get, getSentinel, put, putSentinel):
self._thread = threading.Thread(target=self.loop, args=(get, getSentinel, put, putSentinel))
self._thread.daemon = True
self._thread.start()
@classmethod
def loop(cls, get, getSentinel, put, putSentinel):
for item in iter(get, getSentinel):
put(item)
put(putSentinel)
def join(self, timeout=None):
self._thread.join(timeout)
class Communicator(object):
def __init__(self, write, writeSentinel, read, readSentinel):
self._out = Queue.Queue()
self._write = write
self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
self._writeSentinel = writeSentinel
self._in = {}
self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
self._readSentinel = readSentinel
self._closing = False
self._lock = threading.RLock()
self._next = itertools.count().next
def __del__(self):
self.close()
def put(self, command, request=True):
with self._lock:
deferredResult = Queue.Queue() if request else None
if not self._closing:
self._out.put((deferredResult, command))
elif deferredResult:
deferredResult.put(StopIteration())
return deferredResult
def close(self, timeout=None):
self._shutdown()
self._writer.join(timeout)
self._reader.join(timeout)
def _notify(self, requestId, result):
self._in.pop(requestId).put(result)
def _onSend(self, item):
(deferredResult, command) = item
requestId = None
if deferredResult:
requestId = self._next()
self._in[requestId] = deferredResult
try:
self._write(requestId, command)
except StopIteration as e:
self._shutdown()
if deferredResult:
self._notify(requestId, e)
except Exception as e:
if deferredResult:
self._notify(requestId, e)
def _onReceive(self, item):
if item == self._readSentinel:
frame = item
self._shutdown()
self._writer.join()
requestIds = list(self._in)
else:
(requestId, frame) = item
requestIds = [requestId]
for requestId in requestIds:
self._notify(requestId, frame)
def _shutdown(self):
with self._lock:
if self._closing:
return
self.put(self._writeSentinel, request=False)
self._closing = True
class Writer(object):
SENTINEL = None
def __init__(self, stream):
self._stream = stream
def __call__(self, requestId, command):
frame = self._compile(requestId, command)
if frame is not None:
try:
self._stream.write(frame)
self._stream.flush()
except:
command = self.SENTINEL
if command == self.SENTINEL:
try:
self._stream.close()
finally:
raise StopIteration()
def _compile(self, requestId, command):
raise NotImplementedError()
class Reader(object):
SENTINEL = None
FRAME_SEPARATOR = '\x00'
def __init__(self, stream):
self._stream = stream
self.__read = self._read().next
def __call__(self):
return self.__read()
def _next(self):
try:
return self._stream.read(1)
except:
return ''
def _parse(self, frame):
raise NotImplementedError()
def _read(self):
self._reset()
for character in iter(self._next, ''):
if character != self.FRAME_SEPARATOR:
self._buffer.write(character)
continue
frame = self._buffer.getvalue()
self._reset()
yield self._parse(frame)
try:
self._stream.close()
finally:
yield self.SENTINEL
def _reset(self):
self._buffer = cStringIO.StringIO()
class SubprocessSession(object):
def __init__(self, command, writer, reader, start=True, **kwargs):
self._command = command
self._writer = writer
self._reader = reader
self._kwargs = {'stdin': subprocess.PIPE, 'stdout': subprocess.PIPE}
self._kwargs.update(kwargs)
self._lock = threading.RLock()
if start:
self.start()
def __del__(self):
self.close()
def put(self, frame, request=True):
return self._communicator.put(frame, request)
def start(self):
with self._lock:
if hasattr(self, '_process'):
return
self._process = subprocess.Popen(self._command, **self._kwargs)
self._communicator = Communicator(
self._writer(self._process.stdin), self._writer.SENTINEL,
self._reader(self._process.stdout), self._reader.SENTINEL
)
def close(self, timeout=None):
with self._lock:
if not hasattr(self, '_process'):
return
try:
self._communicator.close(timeout)
finally:
if self._process.poll() is None:
self._process.kill()
del self._process
del self._communicator
# I use the part above as a generic utility module. A simple echo example follows:
class EchoWriter(Writer):
def _compile(self, requestId, command):
if command == self.SENTINEL:
return
return command if (requestId is None) else ('%s:%s\x00' % (requestId, command))
class EchoReader(Reader):
def _parse(self, frame):
requestId, frame = frame.split(':', 1)
return (int(requestId), frame)
def run():
from multiprocessing.pool import ThreadPool
session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)
pool = ThreadPool(50)
requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
results = pool.map(lambda r: r.get(), requests)
print results == ['message %d' % j for j in xrange(2000)]
if __name__ == '__main__':
run()
Diff to Previous Revision
--- revision 2 2013-03-11 20:00:44
+++ revision 3 2013-03-13 06:06:16
@@ -22,7 +22,6 @@
class Communicator(object):
def __init__(self, write, writeSentinel, read, readSentinel):
self._out = Queue.Queue()
-
self._write = write
self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
self._writeSentinel = writeSentinel
@@ -31,6 +30,7 @@
self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
self._readSentinel = readSentinel
+ self._closing = False
self._lock = threading.RLock()
self._next = itertools.count().next
@@ -38,17 +38,21 @@
self.close()
def put(self, command, request=True):
- deferredResult = Queue.Queue() if request else None
- self._out.put((deferredResult, command))
- return deferredResult
+ with self._lock:
+ deferredResult = Queue.Queue() if request else None
+ if not self._closing:
+ self._out.put((deferredResult, command))
+ elif deferredResult:
+ deferredResult.put(StopIteration())
+ return deferredResult
def close(self, timeout=None):
- self.put(self._writeSentinel, request=False)
+ self._shutdown()
self._writer.join(timeout)
self._reader.join(timeout)
- def _notify(self, requestId, frame):
- self._in.pop(requestId).put(frame)
+ def _notify(self, requestId, result):
+ self._in.pop(requestId).put(result)
def _onSend(self, item):
(deferredResult, command) = item
@@ -59,13 +63,18 @@
try:
self._write(requestId, command)
- except:
- self.put(self._writeSentinel, request=False)
+ except StopIteration as e:
+ self._shutdown()
+ if deferredResult:
+ self._notify(requestId, e)
+ except Exception as e:
+ if deferredResult:
+ self._notify(requestId, e)
def _onReceive(self, item):
if item == self._readSentinel:
frame = item
- self.put(self._writeSentinel, request=False)
+ self._shutdown()
self._writer.join()
requestIds = list(self._in)
else:
@@ -75,6 +84,13 @@
for requestId in requestIds:
self._notify(requestId, frame)
+ def _shutdown(self):
+ with self._lock:
+ if self._closing:
+ return
+ self.put(self._writeSentinel, request=False)
+ self._closing = True
+
class Writer(object):
SENTINEL = None
@@ -84,10 +100,16 @@
def __call__(self, requestId, command):
frame = self._compile(requestId, command)
if frame is not None:
- self._stream.write(frame)
- self._stream.flush()
+ try:
+ self._stream.write(frame)
+ self._stream.flush()
+ except:
+ command = self.SENTINEL
if command == self.SENTINEL:
- self._stream.close()
+ try:
+ self._stream.close()
+ finally:
+ raise StopIteration()
def _compile(self, requestId, command):
raise NotImplementedError()
@@ -103,20 +125,28 @@
def __call__(self):
return self.__read()
+ def _next(self):
+ try:
+ return self._stream.read(1)
+ except:
+ return ''
+
def _parse(self, frame):
raise NotImplementedError()
def _read(self):
self._reset()
- for character in iter(lambda: self._stream.read(1), ''):
+ for character in iter(self._next, ''):
if character != self.FRAME_SEPARATOR:
self._buffer.write(character)
continue
frame = self._buffer.getvalue()
self._reset()
yield self._parse(frame)
- self._stream.close()
- yield self.SENTINEL
+ try:
+ self._stream.close()
+ finally:
+ yield self.SENTINEL
def _reset(self):
self._buffer = cStringIO.StringIO()
@@ -161,6 +191,8 @@
del self._process
del self._communicator
+# I use the part above as a generic utility module. A simple echo example follows:
+
class EchoWriter(Writer):
def _compile(self, requestId, command):
if command == self.SENTINEL: