This recipe shows how to domesticate another executable as a service in a subprocess.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | import cStringIO
import itertools
import Queue
import subprocess
import threading
class Transporter(object):
def __init__(self, get, getSentinel, put, putSentinel):
self._thread = threading.Thread(target=self.loop, args=(get, getSentinel, put, putSentinel))
self._thread.daemon = True
self._thread.start()
@classmethod
def loop(cls, get, getSentinel, put, putSentinel):
for item in iter(get, getSentinel):
put(item)
put(putSentinel)
def join(self, timeout=None):
self._thread.join(timeout)
class Communicator(object):
def __init__(self, write, writeSentinel, read, readSentinel):
self._out = Queue.Queue()
self._write = write
self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
self._writeSentinel = writeSentinel
self._in = {}
self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
self._readSentinel = readSentinel
self._closing = False
self._lock = threading.RLock()
self._next = itertools.count().next
def __del__(self):
self.close()
def put(self, command, request=True):
with self._lock:
deferredResult = Queue.Queue() if request else None
if not self._closing:
self._out.put((deferredResult, command))
elif deferredResult:
deferredResult.put(StopIteration())
return deferredResult
def close(self, timeout=None):
self._shutdown()
self._writer.join(timeout)
self._reader.join(timeout)
def _notify(self, requestId, result):
self._in.pop(requestId).put(result)
def _onSend(self, item):
(deferredResult, command) = item
requestId = None
if deferredResult:
requestId = self._next()
self._in[requestId] = deferredResult
try:
self._write(requestId, command)
except StopIteration as e:
self._shutdown()
if deferredResult:
self._notify(requestId, e)
except Exception as e:
if deferredResult:
self._notify(requestId, e)
def _onReceive(self, item):
if item == self._readSentinel:
frame = item
self._shutdown()
self._writer.join()
requestIds = list(self._in)
else:
(requestId, frame) = item
requestIds = [requestId]
for requestId in requestIds:
self._notify(requestId, frame)
def _shutdown(self):
with self._lock:
if self._closing:
return
self.put(self._writeSentinel, request=False)
self._closing = True
class Writer(object):
SENTINEL = None
def __init__(self, stream):
self._stream = stream
def __call__(self, requestId, command):
frame = self._compile(requestId, command)
if frame is not None:
try:
self._stream.write(frame)
self._stream.flush()
except:
command = self.SENTINEL
if command == self.SENTINEL:
try:
self._stream.close()
finally:
raise StopIteration()
def _compile(self, requestId, command):
raise NotImplementedError()
class Reader(object):
SENTINEL = None
FRAME_SEPARATOR = '\x00'
def __init__(self, stream):
self._stream = stream
self.__read = self._read().next
def __call__(self):
return self.__read()
def _next(self):
try:
return self._stream.read(1)
except:
return ''
def _parse(self, frame):
raise NotImplementedError()
def _read(self):
self._reset()
for character in iter(self._next, ''):
if character != self.FRAME_SEPARATOR:
self._buffer.write(character)
continue
frame = self._buffer.getvalue()
self._reset()
yield self._parse(frame)
try:
self._stream.close()
finally:
yield self.SENTINEL
def _reset(self):
self._buffer = cStringIO.StringIO()
class SubprocessSession(object):
def __init__(self, command, writer, reader, start=True, **kwargs):
self._command = command
self._writer = writer
self._reader = reader
self._kwargs = {'stdin': subprocess.PIPE, 'stdout': subprocess.PIPE}
self._kwargs.update(kwargs)
self._lock = threading.RLock()
if start:
self.start()
def __del__(self):
self.close()
def put(self, frame, request=True):
return self._communicator.put(frame, request)
def start(self):
with self._lock:
if hasattr(self, '_process'):
return
self._process = subprocess.Popen(self._command, **self._kwargs)
self._communicator = Communicator(
self._writer(self._process.stdin), self._writer.SENTINEL,
self._reader(self._process.stdout), self._reader.SENTINEL
)
def close(self, timeout=None):
with self._lock:
if not hasattr(self, '_process'):
return
try:
self._communicator.close(timeout)
finally:
if self._process.poll() is None:
self._process.kill()
del self._process
del self._communicator
# I use the part above as a generic utility module. A simple echo example follows:
class EchoWriter(Writer):
def _compile(self, requestId, command):
if command == self.SENTINEL:
return
return command if (requestId is None) else ('%s:%s\x00' % (requestId, command))
class EchoReader(Reader):
def _parse(self, frame):
requestId, frame = frame.split(':', 1)
return (int(requestId), frame)
def run():
from multiprocessing.pool import ThreadPool
session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)
pool = ThreadPool(50)
requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
results = pool.map(lambda r: r.get(), requests)
print results == ['message %d' % j for j in xrange(2000)]
if __name__ == '__main__':
run()
|
The SubprocessSession
implements a thread-safe put/get API. The reply is matched against the request via an integer correlation ID. The executable is expected to listen for requests on stdin and deliver the results along with the request ID on stdout.
In my real-world problem, we wish to access functionality in a C++ based executable (B) from various Python applications (A). In our first attempt, we "imported" B via the Python/C API, which led to strong coupling and to compiling problems.
Instead, we let B provide a stream-based API and then domesticate B as a subprocess of A. We now incur a significant overhead for serialization, queuing and piping, but we have a versatile interface, and we are not restricted to running A with CPython, let alone that we may now access B's stream-based API from any application (also non-Python) and even remotely (via sockets). If speed and/or distributed usage was an issue, I'd probably go for ZeroMQ (see link below).
See also
- http://code.activestate.com/recipes/576957-asynchronous-subprocess-using-asyncore/
- http://twistedmatrix.com/documents/current/api/twisted.internet.protocol.ProcessProtocol.html
- http://www.erlang.org/doc/tutorial/c_port.html
- http://docs.python.org/2/library/multiprocessing.html#multiprocessing-listeners-clients
- http://taotetek.net/2011/02/02/python-multiprocessing-with-zeromq/
Changes
- re-introduced the lock in Communicator.put(); it is needed to avoid the race condition that producers might still put items into the out queue after the writer "killed himself" with the write sentinel.
- delegated counting to writer thread (where a lock is no longer required)
- removed race condition (trigger all deferred results upon receiving read sentinel, including those which are still in the outgoing queue)
- keywords for subprocess.Popen
@Jan: This isn't Asynchronous. This is MultiThreaded. For an Asynchronous Process implementation (using subprocess and circuits) see https://bitbucket.org/circuits/circuits-dev/src/616fadb076f0/circuits/io/process.py?at=default --JamesMills / prologic
@James: "Asynchronous" programming means that the main thread gives up control over parts of the program flow. Therefore, multithreaded code is by definition asynchronous, see http://stackoverflow.com/questions/600795/asynchronous-vs-multithreading-is-there-a-difference. You are probably referring to "concurrent" programming where the different program parts do not necessarily need to be executed in parallel. I nevertheless retitled the recipe. An event based ("reactive") solution as in Twisted did not fit my needs since it is synchronous code which needs to ask questions to B. I'll have a look into circuits.
@Jan: I didn't mean to he blunt in my first response! Sorry! There is a lot of overlap between the definitions of asynchronous, concurrent, etc; However most folk think of async in Python to mean things like gevent, twisted, circuits, etc ;)
Forgive me for the dumb question, but in:
results = pool.map(lambda r: r.get(), requests)
What is r? What's its type? session was defined just above, but r is not found anywhere.
I mean: of what type must r be in order to have a method called get?
OK, I think I understand now. Python can still be difficult, to me.