Welcome, guest | Sign In | My Account | Store | Cart

This recipe shows how to domesticate another executable as a service in a subprocess.

Python, 219 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import cStringIO
import itertools
import Queue
import subprocess
import threading

class Transporter(object):
    def __init__(self, get, getSentinel, put, putSentinel):
        self._thread = threading.Thread(target=self.loop, args=(get, getSentinel, put, putSentinel))
        self._thread.daemon = True
        self._thread.start()

    @classmethod
    def loop(cls, get, getSentinel, put, putSentinel):
        for item in iter(get, getSentinel):
            put(item)
        put(putSentinel)

    def join(self, timeout=None):
        self._thread.join(timeout)

class Communicator(object):
    def __init__(self, write, writeSentinel, read, readSentinel):
        self._out = Queue.Queue()
        self._write = write
        self._writer = Transporter(self._out.get, (None, writeSentinel), self._onSend, (None, writeSentinel))
        self._writeSentinel = writeSentinel

        self._in = {}
        self._reader = Transporter(read, readSentinel, self._onReceive, readSentinel)
        self._readSentinel = readSentinel

        self._closing = False
        self._lock = threading.RLock()
        self._next = itertools.count().next

    def __del__(self):
        self.close()

    def put(self, command, request=True):
        with self._lock:
            deferredResult = Queue.Queue() if request else None
            if not self._closing:
                self._out.put((deferredResult, command))
            elif deferredResult:
                deferredResult.put(StopIteration())
            return deferredResult

    def close(self, timeout=None):
        self._shutdown()
        self._writer.join(timeout)
        self._reader.join(timeout)

    def _notify(self, requestId, result):
        self._in.pop(requestId).put(result)

    def _onSend(self, item):
        (deferredResult, command) = item
        requestId = None
        if deferredResult:
            requestId = self._next()
            self._in[requestId] = deferredResult

        try:
            self._write(requestId, command)
        except StopIteration as e:
            self._shutdown()
            if deferredResult:
                self._notify(requestId, e)
        except Exception as e:
            if deferredResult:
                self._notify(requestId, e)

    def _onReceive(self, item):
        if item == self._readSentinel:
            frame = item
            self._shutdown()
            self._writer.join()
            requestIds = list(self._in)
        else:
            (requestId, frame) = item
            requestIds = [requestId]

        for requestId in requestIds:
            self._notify(requestId, frame)

    def _shutdown(self):
        with self._lock:
            if self._closing:
                return
            self.put(self._writeSentinel, request=False)
            self._closing = True

class Writer(object):
    SENTINEL = None

    def __init__(self, stream):
        self._stream = stream

    def __call__(self, requestId, command):
        frame = self._compile(requestId, command)
        if frame is not None:
            try:
                self._stream.write(frame)
                self._stream.flush()
            except:
                command = self.SENTINEL
        if command == self.SENTINEL:
            try:
                self._stream.close()
            finally:
                raise StopIteration()

    def _compile(self, requestId, command):
        raise NotImplementedError()

class Reader(object):
    SENTINEL = None
    FRAME_SEPARATOR = '\x00'

    def __init__(self, stream):
        self._stream = stream
        self.__read = self._read().next

    def __call__(self):
        return self.__read()

    def _next(self):
        try:
            return self._stream.read(1)
        except:
            return ''

    def _parse(self, frame):
        raise NotImplementedError()

    def _read(self):
        self._reset()
        for character in iter(self._next, ''):
            if character != self.FRAME_SEPARATOR:
                self._buffer.write(character)
                continue
            frame = self._buffer.getvalue()
            self._reset()
            yield self._parse(frame)
        try:
            self._stream.close()
        finally:
            yield self.SENTINEL

    def _reset(self):
        self._buffer = cStringIO.StringIO()

class SubprocessSession(object):
    def __init__(self, command, writer, reader, start=True, **kwargs):
        self._command = command
        self._writer = writer
        self._reader = reader
        self._kwargs = {'stdin': subprocess.PIPE, 'stdout': subprocess.PIPE}
        self._kwargs.update(kwargs)

        self._lock = threading.RLock()
        if start:
            self.start()

    def __del__(self):
        self.close()

    def put(self, frame, request=True):
        return self._communicator.put(frame, request)

    def start(self):
        with self._lock:
            if hasattr(self, '_process'):
                return
            self._process = subprocess.Popen(self._command, **self._kwargs)
            self._communicator = Communicator(
                self._writer(self._process.stdin), self._writer.SENTINEL,
                self._reader(self._process.stdout), self._reader.SENTINEL
            )

    def close(self, timeout=None):
        with self._lock:
            if not hasattr(self, '_process'):
                return
            try:
                self._communicator.close(timeout)
            finally:
                if self._process.poll() is None:
                    self._process.kill()
                del self._process
                del self._communicator

# I use the part above as a generic utility module. A simple echo example follows:

class EchoWriter(Writer):
    def _compile(self, requestId, command):
        if command == self.SENTINEL:
            return
        return command if (requestId is None) else ('%s:%s\x00' % (requestId, command))

class EchoReader(Reader):
    def _parse(self, frame):
        requestId, frame = frame.split(':', 1)
        return (int(requestId), frame)

def run():
    from multiprocessing.pool import ThreadPool

    session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)

    pool = ThreadPool(50)
    requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
    results = pool.map(lambda r: r.get(), requests)

    print results == ['message %d' % j for j in xrange(2000)]

if __name__ == '__main__':
    run()

The SubprocessSession implements a thread-safe put/get API. The reply is matched against the request via an integer correlation ID. The executable is expected to listen for requests on stdin and deliver the results along with the request ID on stdout.

In my real-world problem, we wish to access functionality in a C++ based executable (B) from various Python applications (A). In our first attempt, we "imported" B via the Python/C API, which led to strong coupling and to compiling problems.

Instead, we let B provide a stream-based API and then domesticate B as a subprocess of A. We now incur a significant overhead for serialization, queuing and piping, but we have a versatile interface, and we are not restricted to running A with CPython, let alone that we may now access B's stream-based API from any application (also non-Python) and even remotely (via sockets). If speed and/or distributed usage was an issue, I'd probably go for ZeroMQ (see link below).

See also
Changes
  • re-introduced the lock in Communicator.put(); it is needed to avoid the race condition that producers might still put items into the out queue after the writer "killed himself" with the write sentinel.
  • delegated counting to writer thread (where a lock is no longer required)
  • removed race condition (trigger all deferred results upon receiving read sentinel, including those which are still in the outgoing queue)
  • keywords for subprocess.Popen

6 comments

James Mills 11 years ago  # | flag

@Jan: This isn't Asynchronous. This is MultiThreaded. For an Asynchronous Process implementation (using subprocess and circuits) see https://bitbucket.org/circuits/circuits-dev/src/616fadb076f0/circuits/io/process.py?at=default --JamesMills / prologic

Jan Müller (author) 11 years ago  # | flag

@James: "Asynchronous" programming means that the main thread gives up control over parts of the program flow. Therefore, multithreaded code is by definition asynchronous, see http://stackoverflow.com/questions/600795/asynchronous-vs-multithreading-is-there-a-difference. You are probably referring to "concurrent" programming where the different program parts do not necessarily need to be executed in parallel. I nevertheless retitled the recipe. An event based ("reactive") solution as in Twisted did not fit my needs since it is synchronous code which needs to ask questions to B. I'll have a look into circuits.

James Mills 11 years ago  # | flag

@Jan: I didn't mean to he blunt in my first response! Sorry! There is a lot of overlap between the definitions of asynchronous, concurrent, etc; However most folk think of async in Python to mean things like gevent, twisted, circuits, etc ;)

Mauro B. Bianc 11 years ago  # | flag

Forgive me for the dumb question, but in:

results = pool.map(lambda r: r.get(), requests)

What is r? What's its type? session was defined just above, but r is not found anywhere.

Mauro B. Bianc 11 years ago  # | flag

I mean: of what type must r be in order to have a method called get?

Mauro B. Bianc 11 years ago  # | flag

OK, I think I understand now. Python can still be difficult, to me.