A coroutine-based wrapper for subprocess.Popen that uses asyncore to communicate with child processes asynchronously. This allows subprocesses to be called from within socket servers or clients without needing a complicated event loop to check both. Uses recipe 576965 to provide the asynchronous coroutine framework, recipe 576967 to provide asynchronous pipes, and recipe 577600 to provide multiple alarms.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | #!/usr/bin/env python
"""asyncsubproc.py: Asynchronous subprocess communication using asyncore.
The `AsyncPopen` class wraps the I/O pipes from `Popen` in asynchronous
dispatchers, providing asynchronous communication with the subprocess using
`asyncore.loop()` to read and write in parallel with other I/O. The
`SubprocessExecutor` class wraps `AsyncPopen` in an `Executor`, allowing
inline subprocess execution using a generator.
Full-duplex Communication:
Data that the subprocess writes might not be made available to the parent until
the subprocess calls `flush()` or exits; thus, a parent which attempts to write
data, read a response, and then write new data contingent on the response might
find itself deadlocked. There seems to be no way for the parent process to
force flushing of the subprocess output; changing the value of the `bufsize`
parameter to `Popen()` to zero (or any other value) doesn't do it, and
`asyncore.file_dispatcher` already sets `O_NONBLOCK` on the pipes.
Subprocess Exit:
Detecting subprocess exit while avoiding zombie subprocesses can be tricky in
asynchronous code. Calling `wait()` on a subprocess would block, leaving three
alternatives for checking for subprocess exit:
1) Exit the asynchronous select loop (e.g. `asyncore.loop()`) occasionally
to call `poll()` on any unterminated subprocesses. This requires maintaining a
list of all unterminated subprocess objects, along with any context needed to
handle the subprocess exit.
2) Set a handler for `SIGCHLD` which calls `os.waitpid(-1, os.WNOHANG)`,
and then use the return value to locate the asynchronous process object and
handle the subprocess exit. This must be done in a loop to avoid missing
consolidated signals, requires maintaining a list of all unterminated
subprocesses, and is limited by reentrancy restrictions on signal handlers.
3) Check for `stdout` and `stderr` to both be closed, which can be done as
part of the asynchronous loop which reads data. This requires that at least one
of `stdout` and `stderr` be a pipe, but an asynchronous subprocess is probably
unnecessary in the first place if neither is a pipe. There is no absolute
guarantee that the subprocess has exited when `stdout` and `stderr` have
closed, but once they have, no more data is coming. However, because `wait()`
is not being called on the subprocesses, special care has to be taken to avoid
leaving zombie subproceses. There are again three alternatives:
a) Set `SIGCHLD` to `SIG_IGN`. This should work on most varieties of UNIX
including Mac OS X. However, it prevents collecting the exit status of the
subprocess; `poll()` will return `None` and `wait()` will raise an `OSError`
exception.
b) Set a handler for `SIGCHLD` as in solution (2) above; if this is to be
implemented, it may be better to simply implement solution (2) rather than
waiting for the output pipes to close in the first place.
c) Call `wait()` on the subprocess after stdout and stderr are closed.
While this will block (briefly), it should be reasonably safe unless the
subprocess does something very unusual.
`SubprocessExecutor` waits for `stdout` and `stderr` to both be closed, and
then calls `wait()` on the subprocess if no handler for `SIGCHLD` is set.
References:
http://code.activestate.com/recipes/577600/ [queued SIGALRM alarms]
http://code.activestate.com/recipes/576965/ [event-based asynchronous pattern]
http://code.activestate.com/recipes/576967/ [asynchronous pipe I/O]
"""
import os
import sys
import signal
import threading
from traceback import print_exc
from subprocess import Popen, PIPE
from logging import ERROR, INFO
import alarm
from asyncpipes import PipeDispatcher, InputPipeDispatcher, OutputPipeDispatcher
from worker import Executor
from observer import Observable
if __name__ == '__main__':
import optparse
from asyncore import loop
from string import digits
from time import sleep
from worker import execute, ExecutionQueue
__version__ = '$Revision: 3414 $'.split()[1]
__usage__ = 'usage: %prog [options] [data]'
class AsyncPopen(Observable, Popen):
"""An extension to Popen which creates a subprocess with asynchronous
pipes for input and output. Pipe output can be read using an Observer
pattern while asyncore.loop() is run.
Also contains additional small extensions, such as a subprocess timeout
and a fix to handling of signals for subprocesses.
"""
def __init__(self, argv, map=None, timeout=None, close_when_done=True,
stdin=PIPE, stdout=PIPE, stderr=PIPE, preexec_fn=None, bufsize=0, **popen_keyw):
"""Accepts all the same arguments and keywords as `subprocess.Popen`.
Input or outputs specified as `PIPE` (now the default) for are wrapped
in an asynchronous pipe dispatcher.
The timeout is used to create an alarm, which can be cancelled by
calling `cancel_timeout()`, `communicate()`, `wait()` or `kill()`.
"""
Observable.__init__(self)
self._map = map
# Create the subprocess itself, wrapping preexec_fn in the clear_signals call
Popen.__init__(self, argv, preexec_fn=lambda: self.clear_signals(preexec_fn),
stdin=stdin, stdout=stdout, stderr=stderr, **popen_keyw)
# Set the timeout on the subprocess. If it fails, ignore the failure.
try:
fto = float(timeout)
self._alarmobj = alarm.alarm(fto, self.kill) if fto > 0 else None
except:
self._alarmobj = None
# Wrap the pipe I/O. Sets the Popen and pipe buffer sizes the same; perhaps not optimal.
if stdout == PIPE:
self.stdout = OutputPipeDispatcher(self.stdout, map=map, ignore_broken_pipe=True,
universal_newlines=self.universal_newlines, maxdata=bufsize)
self.stdout.obs_add(self._pipe_event)
if stderr == PIPE:
self.stderr = OutputPipeDispatcher(self.stderr, map=map, ignore_broken_pipe=True,
universal_newlines=self.universal_newlines, maxdata=bufsize)
self.stderr.obs_add(self._pipe_event)
if stdin == PIPE:
self.stdin = InputPipeDispatcher(self.stdin, map=map, ignore_broken_pipe=True,
close_when_done=close_when_done, maxdata=bufsize)
self.stdin.obs_add(self._pipe_event)
def cancel_timeout(self, logger=None):
if not self._alarmobj: return
try:
alarm.cancel(self._alarmobj)
except:
if logger: logger.debug("Error canceling child PID %d alarm" % child.pid, exc_info=1)
finally:
self._alarmobj = None
def wait(self, logger=None):
returncode = Popen.wait(self)
self.cancel_timeout(logger=logger)
return returncode
@staticmethod
def clear_signals(preexec_fn):
"""Wraps any preexec_fn in order to clear any signal handlers."""
for s in range(1, signal.NSIG):
try:
if s not in [signal.SIGKILL, signal.SIGSTOP]: signal.signal(s, signal.SIG_DFL)
except:
pass
if callable(preexec_fn): preexec_fn()
def kill(self):
"""Kill the child process with extreme prejudice."""
try:
if self.returncode is None: os.kill(self.pid, signal.SIGKILL)
finally:
self.cancel_timeout()
def fetch_output(self, clear=True):
"""Fetch data from the subprocess output pipes.
An output file not set to a pipe returns an empty string.
"""
outdata = self.stdout.fetch_data(clear) if self.stdout is not None else ''
errdata = self.stderr.fetch_data(clear) if self.stderr is not None else ''
return outdata, errdata
def output_closed(self):
"""Return true if both subprocess output pipes are closed.
Can be used to detected the termination of the subprocess. An output
file not sent to a pipe is ignored.
"""
outread = self.stdout.readable() if self.stdout is not None else False
errread = self.stderr.readable() if self.stderr is not None else False
return not (outread or errread)
def _pipe_event(self, observed, event):
"""Forward events on the pipes. The forwarded events contain the pipe
event and the pipe itself as a two-element tuple."""
self._obs_notify((event, observed))
class SubprocessExecutor(Executor):
"""Executes subprocesses, reading and writing data using `asyncore`.
For each subprocess to be created, the generator must yield either the
object to be passed to the `argv` argument of the `Popen` constructor,
or a dictionary containing a required `argv` key, an optional `input` key
containing a string to be written to `stdin` of the subprocess, and keys
corresponding to the keyword parameters to `AsyncPopen` (the same keywords
as the `child_spawn()` method).
Once the subprocess has exited, the executor will call `send()` on the
generator, passing a 4-element tuple containing the data read from
`stdout` and `stderr`, the exit status returned by `Popen.poll()`, and the
pid of the subprocess. The generator can then yield the parameters for
another subprocess.
"""
def __init__(self, generator, exc_handler=print_exc, logger=None, **async_popen_keyw):
"""Initialize a subprocess executor.
Additional keyword parameters to this constructor (usually passed
through the decorator) will be passed to `AsyncPopen`.
"""
Executor.__init__(self, generator, exc_handler)
self._logger = logger
self.__async_popen_dict = async_popen_keyw
self.__current_child = None
def _execute(self, logger=None, **async_popen_keyw):
"""Iterate the generator to completion (in the calling thread).
The generator must yield the parameters for the first subprocess,
which will be passed to `_spawn()`.
Additional keyword parameters passed to this object when called will
be passed to `AsyncPopen` (and override values passed to this object's
constructor).
"""
self.__async_popen_dict.update(async_popen_keyw)
if logger is not None: self._logger = logger
# Get the command to be executed from the generator
self.__coerce_and_spawn(self.next())
def _pipe_closed(self, observed, event):
"""Called when one of the output pipes (stdout or stderr) is closed.
Once both are closed, declare the subprocess finished and call
`_child_exit()`.
"""
if observed.output_closed(): self._child_exit(observed)
def _child_exit(self, child):
"""Called once `stdout` and `stderr` are both closed.
Cleans up the subprocess, and then passes the subprocess results tom
the generator by calling `send()`. If the generator yields parameters
for another subprocess, calls `_child_spawn()`.
"""
self.__current_child = None
# Close stdin for the child, so that it knows it won't be getting more data
try:
if child.stdin is not None: child.stdin.close()
except:
if self._logger: self._logger.debug("Error closing stdin for PID %d" % child.pid, exc_info=1)
# Wait for the child if there's no signal handler
if signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL:
try:
# This will cancel the alarm
returncode = child.wait(logger=self._logger)
except:
if self._logger: self._logger.debug("Error waiting for child PID %d" % child.pid, exc_info=1)
else: print_exc(file=sys.stderr)
else:
child.cancel_timeout(logger=self._logger)
# This next will return None unless an exit status injector has been set up.
returncode = child.poll()
# Extract the result from the child process; and move on with the executor
try:
outdata, errdata = child.fetch_output()
child_result = (outdata, errdata, returncode, child.pid)
if self._logger: self._logger.debug("PID %d exited with code %s" % (child.pid, returncode))
self.__coerce_and_spawn(self.send(child_result))
except:
self.throw(*sys.exc_info())
def close(self):
"""Kill the subprocess when closing the generator."""
child = self.__current_child
if child:
try:
child.kill()
except:
if self._logger: self._logger.exception("Error killing child PID %d" % child.pid)
else: print_exc(file=sys.stderr)
else:
self.__current_child = None
Executor.close(self)
def __coerce_and_spawn(self, arg):
"""Coerce the argument into a call to `_child_spawn()`"""
try:
self._child_spawn(**arg)
except:
self._child_spawn(argv=arg)
def _child_spawn(self, argv=None, input=None, **async_popen_keyw):
"""Create the subprocess and send the data to the input pipe. Called
with the value(s) yielded by the generator.
If a subprocess is to be spawned, the `argv` keyword must be supplied
with a non-empty value. The value passed to the `input` keyword will
be written to `stdin` of the subprocess.
Additional keyword parameters passed to this method will
be passed to `AsyncPopen` (and override values passed to this object's
constructor).
"""
if self.stopped(): return
# Merge the keyword arguments together to pass to AsyncPopen
async_popen_dict = self.__async_popen_dict.copy()
async_popen_dict.update(async_popen_keyw)
if input: async_popen_dict["stdin"] = PIPE
# Create the subprocess itself
if self._logger: self._logger.debug("Spawning subprocess %s" % argv)
self.__current_child = AsyncPopen(argv, **async_popen_dict)
if self._logger: self._logger.debug("Spawned subprocess %s with PID %d" % (argv, self.__current_child.pid))
# Listen for both output pipes to close, and push the data to stdin
self.__current_child.obs_add(self._pipe_closed, criteria=PipeDispatcher.PIPE_CLOSED)
if input: self.__current_child.stdin.push_data(str(input))
if __name__ == '__main__':
def printdata(data, pid, channame):
print '[%d] %s %d bytes received: %r' % (pid, channame, len(data), data)
execq = ExecutionQueue()
@execute(execq, SubprocessExecutor)
def spawn_child(argv, data, child, loops):
"""Spawn a cascade of subprocesses."""
for lp in range(1, loops + 1):
(stdout, stderr, stat, pid) = yield {'argv': argv, 'input': '%s%s' % (data, '\n')}
printdata(stdout, pid, 'stdout')
printdata(stderr, pid, 'stderr')
print "Loop %d child %d [%d] exited with status %s" % (lp, child, pid, stat)
if stat == 0 and data == stdout.rstrip()[::-1]: data = stdout[:-1]
def run_child(pause, exitstat):
"""Run the subprocess code; a simple string inverter."""
line = sys.stdin.readline().strip()
sleep(pause / 2.0)
# Write and close both pipes to show that it waits for exit anyway.
print line[::-1]
print >>sys.stderr, line
sys.stdout.close()
sys.stderr.close()
sleep(pause / 2.0)
sys.exit(exitstat)
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--loops', type='int', metavar='N', default=3,
help='Number of times to iterate each child [%default]')
optparser.add_option('--children', type='int', metavar='N', default=3,
help='Number of children to spawn [%default]')
optparser.add_option('--timeout', type='float', metavar='SECONDS', default=10.0,
help='Maximum time subprocess is allowed to run [%default sec]')
optparser.add_option('--no-signal', dest='nosignal', action='store_true', default=False,
help='Ignore signals from child processes.')
childopts = optparse.OptionGroup(optparser, 'Child options')
childopts.add_option('--child', action='store_true', help=optparse.SUPPRESS_HELP)
childopts.add_option('--pause', type='float', metavar='SECONDS', default=2.0,
help='Time to pause in the child process [%default sec]')
childopts.add_option('--exitstat', type='int', metavar='STATUS', default=0,
help='Child exit status [%default]')
optparser.add_option_group(childopts)
(options, args) = optparser.parse_args()
if options.child:
run_child(options.pause, options.exitstat)
else:
# Run the parent process code: start the first child and send data.
if options.nosignal: signal.signal(signal.SIGCHLD, signal.SIG_IGN)
sys.argv.insert(1, '--child')
# Create and queue the children, and then loop asyncore
data = ' '.join(args) if len(args) else digits
for ch in range(1, options.children + 1):
spawn_child(sys.argv, data, ch, options.loops)(timeout=options.timeout)
loop()
os.system('ps -ef')
|
I was writing a TCP/IP asyncore() server which needed to call a large, ungainly existing program to talk to some hardware. The existing program can take a few seconds to complete each time it is called. (Yes, it would be nice to port the existing program into a library, call it in a separate thread, and so on, but that's not easy.)
I looked at both recipe 440554 and recipe 576759, but either one looked like it would require writing an event loop that alternately called asyncore.loop() and the internal loop for that code with short timeouts. I decided to try wrapping subprocess with asyncore, and found that after some work I had something that functioned tolerably well.
As noted in the comments, a child must flush() stdout in order to allow conversational communication with the child. If stdout is not flushed, the parent process doesn't get to read() the data from the child until the child exits or fills the pipe buffer. Therefore, to avoid indefinite waits and deadlocks, the code does not allow conversational communication with subprocesses. I'm not sure how to fix this issue [and suspect it can't be fixed], but if anyone knows how to force-flush() the child stdout, please comment.
Revision 20: Completely overhauled the class structure, splitting the wrapping of the pipes out of SubprocessExecutor into a subclass of Popen, changing the parameter passed from in the generator from an object to a simple dictionary. Revision 21: After reviewing all the available documentation on zombie subprocesses, finally get (I hope) the subprocess exit handling correct in the SubprocessExecutor.
Python 2.7.1+ on Ubuntu 11.04 $ ./asyncsubproc.py Traceback (most recent call last): File "./asyncsubproc.py", line 35, in <module> import alarm ImportError: No module named alarm
Where is this alarm module from?
Recipe 577600. All required external modules (i.e. my code libraries) are listed in the recipe description.