Welcome, guest | Sign In | My Account | Store | Cart
import cStringIO
import itertools
import Queue
import subprocess
import threading

class Transporter(object):
    def __init__(self, get, getSentinel, put, putSentinel):
        self._thread = threading.Thread(target=self.loop, args=(get, getSentinel, put, putSentinel))
        self._thread.daemon = True
        self._thread.start()

    @classmethod
    def loop(cls, get, getSentinel, put, putSentinel):
        for item in iter(get, getSentinel):
            put(item)
        put(putSentinel)

    def join(self, timeout=None):
        self._thread.join(timeout)

class Communicator(object):
    def __init__(self, write, writeSentinel, read, readSentinel):
        self._out = Queue.Queue()
        self._write = write
        self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
        self._writeSentinel = writeSentinel

        self._in = {}
        self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
        self._readSentinel = readSentinel

        self._closing = False
        self._lock = threading.RLock()
        self._next = itertools.count().next

    def __del__(self):
        self.close()

    def put(self, command, request=True):
        with self._lock:
            deferredResult = Queue.Queue() if request else None
            if not self._closing:
                self._out.put((deferredResult, command))
            elif deferredResult:
                deferredResult.put(StopIteration())
            return deferredResult

    def close(self, timeout=None):
        self._shutdown()
        self._writer.join(timeout)
        self._reader.join(timeout)

    def _notify(self, requestId, result):
        self._in.pop(requestId).put(result)

    def _onSend(self, item):
        (deferredResult, command) = item
        requestId = None
        if deferredResult:
            requestId = self._next()
            self._in[requestId] = deferredResult

        try:
            self._write(requestId, command)
        except StopIteration as e:
            self._shutdown()
            if deferredResult:
                self._notify(requestId, e)
        except Exception as e:
            if deferredResult:
                self._notify(requestId, e)

    def _onReceive(self, item):
        if item == self._readSentinel:
            frame = item
            self._shutdown()
            self._writer.join()
            requestIds = list(self._in)
        else:
            (requestId, frame) = item
            requestIds = [requestId]

        for requestId in requestIds:
            self._notify(requestId, frame)

    def _shutdown(self):
        with self._lock:
            if self._closing:
                return
            self.put(self._writeSentinel, request=False)
            self._closing = True

class Writer(object):
    SENTINEL = None

    def __init__(self, stream):
        self._stream = stream

    def __call__(self, requestId, command):
        frame = self._compile(requestId, command)
        if frame is not None:
            try:
                self._stream.write(frame)
                self._stream.flush()
            except:
                command = self.SENTINEL
        if command == self.SENTINEL:
            try:
                self._stream.close()
            finally:
                raise StopIteration()

    def _compile(self, requestId, command):
        raise NotImplementedError()

class Reader(object):
    SENTINEL = None
    FRAME_SEPARATOR = '\x00'

    def __init__(self, stream):
        self._stream = stream
        self.__read = self._read().next

    def __call__(self):
        return self.__read()

    def _next(self):
        try:
            return self._stream.read(1)
        except:
            return ''

    def _parse(self, frame):
        raise NotImplementedError()

    def _read(self):
        self._reset()
        for character in iter(self._next, ''):
            if character != self.FRAME_SEPARATOR:
                self._buffer.write(character)
                continue
            frame = self._buffer.getvalue()
            self._reset()
            yield self._parse(frame)
        try:
            self._stream.close()
        finally:
            yield self.SENTINEL

    def _reset(self):
        self._buffer = cStringIO.StringIO()

class SubprocessSession(object):
    def __init__(self, command, writer, reader, start=True, **kwargs):
        self._command = command
        self._writer = writer
        self._reader = reader
        self._kwargs = {'stdin': subprocess.PIPE, 'stdout': subprocess.PIPE}
        self._kwargs.update(kwargs)

        self._lock = threading.RLock()
        if start:
            self.start()

    def __del__(self):
        self.close()

    def put(self, frame, request=True):
        return self._communicator.put(frame, request)

    def start(self):
        with self._lock:
            if hasattr(self, '_process'):
                return
            self._process = subprocess.Popen(self._command, **self._kwargs)
            self._communicator = Communicator(
                self._writer(self._process.stdin), self._writer.SENTINEL,
                self._reader(self._process.stdout), self._reader.SENTINEL
            )

    def close(self, timeout=None):
        with self._lock:
            if not hasattr(self, '_process'):
                return
            try:
                self._communicator.close(timeout)
            finally:
                if self._process.poll() is None:
                    self._process.kill()
                del self._process
                del self._communicator

# I use the part above as a generic utility module. A simple echo example follows:

class EchoWriter(Writer):
    def _compile(self, requestId, command):
        if command == self.SENTINEL:
            return
        return command if (requestId is None) else ('%s:%s\x00' % (requestId, command))

class EchoReader(Reader):
    def _parse(self, frame):
        requestId, frame = frame.split(':', 1)
        return (int(requestId), frame)

def run():
    from multiprocessing.pool import ThreadPool

    session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)

    pool = ThreadPool(50)
    requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
    results = pool.map(lambda r: r.get(), requests)

    print results == ['message %d' % j for j in xrange(2000)]

if __name__ == '__main__':
    run()

Diff to Previous Revision

--- revision 2 2013-03-11 20:00:44
+++ revision 3 2013-03-13 06:06:16
@@ -22,7 +22,6 @@
 class Communicator(object):
     def __init__(self, write, writeSentinel, read, readSentinel):
         self._out = Queue.Queue()
-
         self._write = write
         self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
         self._writeSentinel = writeSentinel
@@ -31,6 +30,7 @@
         self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
         self._readSentinel = readSentinel
 
+        self._closing = False
         self._lock = threading.RLock()
         self._next = itertools.count().next
 
@@ -38,17 +38,21 @@
         self.close()
 
     def put(self, command, request=True):
-        deferredResult = Queue.Queue() if request else None
-        self._out.put((deferredResult, command))
-        return deferredResult
+        with self._lock:
+            deferredResult = Queue.Queue() if request else None
+            if not self._closing:
+                self._out.put((deferredResult, command))
+            elif deferredResult:
+                deferredResult.put(StopIteration())
+            return deferredResult
 
     def close(self, timeout=None):
-        self.put(self._writeSentinel, request=False)
+        self._shutdown()
         self._writer.join(timeout)
         self._reader.join(timeout)
 
-    def _notify(self, requestId, frame):
-        self._in.pop(requestId).put(frame)
+    def _notify(self, requestId, result):
+        self._in.pop(requestId).put(result)
 
     def _onSend(self, item):
         (deferredResult, command) = item
@@ -59,13 +63,18 @@
 
         try:
             self._write(requestId, command)
-        except:
-            self.put(self._writeSentinel, request=False)
+        except StopIteration as e:
+            self._shutdown()
+            if deferredResult:
+                self._notify(requestId, e)
+        except Exception as e:
+            if deferredResult:
+                self._notify(requestId, e)
 
     def _onReceive(self, item):
         if item == self._readSentinel:
             frame = item
-            self.put(self._writeSentinel, request=False)
+            self._shutdown()
             self._writer.join()
             requestIds = list(self._in)
         else:
@@ -75,6 +84,13 @@
         for requestId in requestIds:
             self._notify(requestId, frame)
 
+    def _shutdown(self):
+        with self._lock:
+            if self._closing:
+                return
+            self.put(self._writeSentinel, request=False)
+            self._closing = True
+
 class Writer(object):
     SENTINEL = None
 
@@ -84,10 +100,16 @@
     def __call__(self, requestId, command):
         frame = self._compile(requestId, command)
         if frame is not None:
-            self._stream.write(frame)
-            self._stream.flush()
+            try:
+                self._stream.write(frame)
+                self._stream.flush()
+            except:
+                command = self.SENTINEL
         if command == self.SENTINEL:
-            self._stream.close()
+            try:
+                self._stream.close()
+            finally:
+                raise StopIteration()
 
     def _compile(self, requestId, command):
         raise NotImplementedError()
@@ -103,20 +125,28 @@
     def __call__(self):
         return self.__read()
 
+    def _next(self):
+        try:
+            return self._stream.read(1)
+        except:
+            return ''
+
     def _parse(self, frame):
         raise NotImplementedError()
 
     def _read(self):
         self._reset()
-        for character in iter(lambda: self._stream.read(1), ''):
+        for character in iter(self._next, ''):
             if character != self.FRAME_SEPARATOR:
                 self._buffer.write(character)
                 continue
             frame = self._buffer.getvalue()
             self._reset()
             yield self._parse(frame)
-        self._stream.close()
-        yield self.SENTINEL
+        try:
+            self._stream.close()
+        finally:
+            yield self.SENTINEL
 
     def _reset(self):
         self._buffer = cStringIO.StringIO()
@@ -161,6 +191,8 @@
                 del self._process
                 del self._communicator
 
+# I use the part above as a generic utility module. A simple echo example follows:
+
 class EchoWriter(Writer):
     def _compile(self, requestId, command):
         if command == self.SENTINEL:

History