Welcome, guest | Sign In | My Account | Store | Cart
import cStringIO
import itertools
import Queue
import subprocess
import threading

class Transporter(object):
   
def __init__(self, get, getSentinel, put, putSentinel):
       
self._thread = threading.Thread(target=self.loop, args=(get, getSentinel, put, putSentinel))
       
self._thread.daemon = True
       
self._thread.start()

   
@classmethod
   
def loop(cls, get, getSentinel, put, putSentinel):
       
for item in iter(get, getSentinel):
            put
(item)
        put
(putSentinel)

   
def join(self, timeout=None):
       
self._thread.join(timeout)

class Communicator(object):
   
def __init__(self, write, writeSentinel, read, readSentinel):
       
self._out = Queue.Queue()
       
self._write = write
       
self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
       
self._writeSentinel = writeSentinel

       
self._in = {}
       
self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
       
self._readSentinel = readSentinel

       
self._closing = False
       
self._lock = threading.RLock()
       
self._next = itertools.count().next

   
def __del__(self):
       
self.close()

   
def put(self, command, request=True):
       
with self._lock:
            deferredResult
= Queue.Queue() if request else None
           
if not self._closing:
               
self._out.put((deferredResult, command))
           
elif deferredResult:
                deferredResult
.put(StopIteration())
           
return deferredResult

   
def close(self, timeout=None):
       
self._shutdown()
       
self._writer.join(timeout)
       
self._reader.join(timeout)

   
def _notify(self, requestId, result):
       
self._in.pop(requestId).put(result)

   
def _onSend(self, item):
       
(deferredResult, command) = item
        requestId
= None
       
if deferredResult:
            requestId
= self._next()
           
self._in[requestId] = deferredResult

       
try:
           
self._write(requestId, command)
       
except StopIteration as e:
           
self._shutdown()
           
if deferredResult:
               
self._notify(requestId, e)
       
except Exception as e:
           
if deferredResult:
               
self._notify(requestId, e)

   
def _onReceive(self, item):
       
if item == self._readSentinel:
            frame
= item
           
self._shutdown()
           
self._writer.join()
            requestIds
= list(self._in)
       
else:
           
(requestId, frame) = item
            requestIds
= [requestId]

       
for requestId in requestIds:
           
self._notify(requestId, frame)

   
def _shutdown(self):
       
with self._lock:
           
if self._closing:
               
return
           
self.put(self._writeSentinel, request=False)
           
self._closing = True

class Writer(object):
    SENTINEL
= None

   
def __init__(self, stream):
       
self._stream = stream

   
def __call__(self, requestId, command):
        frame
= self._compile(requestId, command)
       
if frame is not None:
           
try:
               
self._stream.write(frame)
               
self._stream.flush()
           
except:
                command
= self.SENTINEL
       
if command == self.SENTINEL:
           
try:
               
self._stream.close()
           
finally:
               
raise StopIteration()

   
def _compile(self, requestId, command):
       
raise NotImplementedError()

class Reader(object):
    SENTINEL
= None
    FRAME_SEPARATOR
= '\x00'

   
def __init__(self, stream):
       
self._stream = stream
       
self.__read = self._read().next

   
def __call__(self):
       
return self.__read()

   
def _next(self):
       
try:
           
return self._stream.read(1)
       
except:
           
return ''

   
def _parse(self, frame):
       
raise NotImplementedError()

   
def _read(self):
       
self._reset()
       
for character in iter(self._next, ''):
           
if character != self.FRAME_SEPARATOR:
               
self._buffer.write(character)
               
continue
            frame
= self._buffer.getvalue()
           
self._reset()
           
yield self._parse(frame)
       
try:
           
self._stream.close()
       
finally:
           
yield self.SENTINEL

   
def _reset(self):
       
self._buffer = cStringIO.StringIO()

class SubprocessSession(object):
   
def __init__(self, command, writer, reader, start=True, **kwargs):
       
self._command = command
       
self._writer = writer
       
self._reader = reader
       
self._kwargs = {'stdin': subprocess.PIPE, 'stdout': subprocess.PIPE}
       
self._kwargs.update(kwargs)

       
self._lock = threading.RLock()
       
if start:
           
self.start()

   
def __del__(self):
       
self.close()

   
def put(self, frame, request=True):
       
return self._communicator.put(frame, request)

   
def start(self):
       
with self._lock:
           
if hasattr(self, '_process'):
               
return
           
self._process = subprocess.Popen(self._command, **self._kwargs)
           
self._communicator = Communicator(
               
self._writer(self._process.stdin), self._writer.SENTINEL,
               
self._reader(self._process.stdout), self._reader.SENTINEL
           
)

   
def close(self, timeout=None):
       
with self._lock:
           
if not hasattr(self, '_process'):
               
return
           
try:
               
self._communicator.close(timeout)
           
finally:
               
if self._process.poll() is None:
                   
self._process.kill()
               
del self._process
               
del self._communicator

# I use the part above as a generic utility module. A simple echo example follows:

class EchoWriter(Writer):
   
def _compile(self, requestId, command):
       
if command == self.SENTINEL:
           
return
       
return command if (requestId is None) else ('%s:%s\x00' % (requestId, command))

class EchoReader(Reader):
   
def _parse(self, frame):
        requestId
, frame = frame.split(':', 1)
       
return (int(requestId), frame)

def run():
   
from multiprocessing.pool import ThreadPool

    session
= SubprocessSession('/bin/cat', EchoWriter, EchoReader)

    pool
= ThreadPool(50)
    requests
= pool.map(lambda j: session.put('message %d' % j), xrange(2000))
    results
= pool.map(lambda r: r.get(), requests)

   
print results == ['message %d' % j for j in xrange(2000)]

if __name__ == '__main__':
    run
()

Diff to Previous Revision

--- revision 2 2013-03-11 20:00:44
+++ revision 3 2013-03-13 06:06:16
@@ -22,7 +22,6 @@
 
class Communicator(object):
     
def __init__(self, write, writeSentinel, read, readSentinel):
         
self._out = Queue.Queue()
-
         
self._write = write
         
self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
         
self._writeSentinel = writeSentinel
@@ -31,6 +30,7 @@
         
self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
         
self._readSentinel = readSentinel
 
+        self._closing = False
         
self._lock = threading.RLock()
         
self._next = itertools.count().next
 
@@ -38,17 +38,21 @@
         
self.close()
 
     
def put(self, command, request=True):
-        deferredResult = Queue.Queue() if request else None
-        self._out.put((deferredResult, command))
-        return deferredResult
+        with self._lock:
+            deferredResult = Queue.Queue() if request else None
+            if not self._closing:
+                self._out.put((deferredResult, command))
+            elif deferredResult:
+                deferredResult.put(StopIteration())
+            return deferredResult
 
     
def close(self, timeout=None):
-        self.put(self._writeSentinel, request=False)
+        self._shutdown()
         
self._writer.join(timeout)
         
self._reader.join(timeout)
 
-    def _notify(self, requestId, frame):
-        self._in.pop(requestId).put(frame)
+    def _notify(self, requestId, result):
+        self._in.pop(requestId).put(result)
 
     
def _onSend(self, item):
         
(deferredResult, command) = item
@@ -59,13 +63,18 @@
 
         
try:
             
self._write(requestId, command)
-        except:
-            self.put(self._writeSentinel, request=False)
+        except StopIteration as e:
+            self._shutdown()
+            if deferredResult:
+                self._notify(requestId, e)
+        except Exception as e:
+            if deferredResult:
+                self._notify(requestId, e)
 
     
def _onReceive(self, item):
         
if item == self._readSentinel:
             frame
= item
-            self.put(self._writeSentinel, request=False)
+            self._shutdown()
             
self._writer.join()
             requestIds
= list(self._in)
         
else:
@@ -75,6 +84,13 @@
         
for requestId in requestIds:
             
self._notify(requestId, frame)
 
+    def _shutdown(self):
+        with self._lock:
+            if self._closing:
+                return
+            self.put(self._writeSentinel, request=False)
+            self._closing = True
+
 
class Writer(object):
     SENTINEL
= None
 
@@ -84,10 +100,16 @@
     
def __call__(self, requestId, command):
         frame
= self._compile(requestId, command)
         
if frame is not None:
-            self._stream.write(frame)
-            self._stream.flush()
+            try:
+                self._stream.write(frame)
+                self._stream.flush()
+            except:
+                command = self.SENTINEL
         
if command == self.SENTINEL:
-            self._stream.close()
+            try:
+                self._stream.close()
+            finally:
+                raise StopIteration()
 
     
def _compile(self, requestId, command):
         
raise NotImplementedError()
@@ -103,20 +125,28 @@
     
def __call__(self):
         
return self.__read()
 
+    def _next(self):
+        try:
+            return self._stream.read(1)
+        except:
+            return ''
+
     
def _parse(self, frame):
         
raise NotImplementedError()
 
     
def _read(self):
         
self._reset()
-        for character in iter(lambda: self._stream.read(1), ''):
+        for character in iter(self._next, ''):
             
if character != self.FRAME_SEPARATOR:
                 
self._buffer.write(character)
                 
continue
             frame
= self._buffer.getvalue()
             
self._reset()
             
yield self._parse(frame)
-        self._stream.close()
-        yield self.SENTINEL
+        try:
+            self._stream.close()
+        finally:
+            yield self.SENTINEL
 
     
def _reset(self):
         
self._buffer = cStringIO.StringIO()
@@ -161,6 +191,8 @@
                 
del self._process
                 
del self._communicator
 
+# I use the part above as a generic utility module. A simple echo example follows:
+
 
class EchoWriter(Writer):
     
def _compile(self, requestId, command):
         
if command == self.SENTINEL:

History