Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""asyncsubproc.py: Asynchronous subprocess communication using asyncore.

The `AsyncPopen` class wraps the I/O pipes from `Popen` in asynchronous
dispatchers, providing asynchronous communication with the subprocess using
`asyncore.loop()` to read and write in parallel with other I/O.  The
`SubprocessExecutor` class wraps `AsyncPopen` in an `Executor`, allowing
inline subprocess execution using a generator.

Full-duplex Communication:
Data that the subprocess writes might not be made available to the parent until
the subprocess calls `flush()` or exits; thus, a parent which attempts to write
data, read a response, and then write new data contingent on the response might
find itself deadlocked. There seems to be no way for the parent process to
force flushing of the subprocess output; changing the value of the `bufsize`
parameter to `Popen()` to zero (or any other value) doesn't do it, and
`asyncore.file_dispatcher` already sets `O_NONBLOCK` on the pipes.

Subprocess Exit:
Detecting subprocess exit while avoiding zombie subprocesses can be tricky in
asynchronous code. Calling `wait()` on a subprocess would block, leaving three
alternatives for checking for subprocess exit:
    1) Exit the asynchronous select loop (e.g. `asyncore.loop()`) occasionally
to call `poll()` on any unterminated subprocesses. This requires maintaining a
list of all unterminated subprocess objects, along with any context needed to
handle the subprocess exit.
    2) Set a handler for `SIGCHLD` which calls `os.waitpid(-1, os.WNOHANG)`,
and then use the return value to locate the asynchronous process object and
handle the subprocess exit. This must be done in a loop to avoid missing
consolidated signals, requires maintaining a list of all unterminated
subprocesses, and is limited by reentrancy restrictions on signal handlers.
    3) Check for `stdout` and `stderr` to both be closed, which can be done as
part of the asynchronous loop which reads data. This requires that at least one
of `stdout` and `stderr` be a pipe, but an asynchronous subprocess is probably
unnecessary in the first place if neither is a pipe. There is no absolute
guarantee that the subprocess has exited when `stdout` and `stderr` have
closed, but once they have, no more data is coming. However, because `wait()`
is not being called on the subprocesses, special care has to be taken to avoid
leaving zombie subproceses. There are again three alternatives:
    a) Set `SIGCHLD` to `SIG_IGN`. This should work on most varieties of UNIX
including Mac OS X. However, it prevents collecting the exit status of the
subprocess; `poll()` will return `None` and `wait()` will raise an `OSError`
exception.
    b) Set a handler for `SIGCHLD` as in solution (2) above; if this is to be
implemented, it may be better to simply implement solution (2) rather than
waiting for the output pipes to close in the first place.
    c) Call `wait()` on the subprocess after stdout and stderr are closed.
While this will block (briefly), it should be reasonably safe unless the
subprocess does something very unusual.
    `SubprocessExecutor` waits for `stdout` and `stderr` to both be closed, and
then calls `wait()` on the subprocess if no handler for `SIGCHLD` is set.

References:
http://code.activestate.com/recipes/577600/ [queued SIGALRM alarms]
http://code.activestate.com/recipes/576965/ [event-based asynchronous pattern]
http://code.activestate.com/recipes/576967/ [asynchronous pipe I/O]
"""

import os
import sys
import signal
import threading
from traceback import print_exc
from subprocess import Popen, PIPE
from logging import ERROR, INFO

import alarm
from asyncpipes import PipeDispatcher, InputPipeDispatcher, OutputPipeDispatcher
from worker import Executor
from observer import Observable

if __name__ == '__main__':
    import optparse
    from asyncore import loop
    from string import digits
    from time import sleep
    from worker import execute, ExecutionQueue

__version__ = '$Revision: 3414 $'.split()[1]

__usage__ = 'usage: %prog [options] [data]'


class AsyncPopen(Observable, Popen):
    """An extension to Popen which creates a subprocess with asynchronous
    pipes for input and output. Pipe output can be read using an Observer
    pattern while asyncore.loop() is run.

    Also contains additional small extensions, such as a subprocess timeout
    and a fix to handling of signals for subprocesses.
    """
    def __init__(self, argv, map=None, timeout=None, close_when_done=True,
            stdin=PIPE, stdout=PIPE, stderr=PIPE, preexec_fn=None, bufsize=0, **popen_keyw):
        """Accepts all the same arguments and keywords as `subprocess.Popen`.
        Input or outputs specified as `PIPE` (now the default) for are wrapped
        in an asynchronous pipe dispatcher.

        The timeout is used to create an alarm, which can be cancelled by
        calling `cancel_timeout()`, `communicate()`, `wait()` or `kill()`.
        """
        Observable.__init__(self)
        self._map = map
        # Create the subprocess itself, wrapping preexec_fn in the clear_signals call
        Popen.__init__(self, argv, preexec_fn=lambda: self.clear_signals(preexec_fn),
                stdin=stdin, stdout=stdout, stderr=stderr, **popen_keyw)
        # Set the timeout on the subprocess.  If it fails, ignore the failure.
        try:
            fto = float(timeout)
            self._alarmobj = alarm.alarm(fto, self.kill) if fto > 0 else None
        except:
            self._alarmobj = None
        # Wrap the pipe I/O. Sets the Popen and pipe buffer sizes the same; perhaps not optimal.
        if stdout == PIPE:
            self.stdout = OutputPipeDispatcher(self.stdout, map=map, ignore_broken_pipe=True,
                    universal_newlines=self.universal_newlines, maxdata=bufsize)
            self.stdout.obs_add(self._pipe_event)
        if stderr == PIPE:
            self.stderr = OutputPipeDispatcher(self.stderr, map=map, ignore_broken_pipe=True,
                    universal_newlines=self.universal_newlines, maxdata=bufsize)
            self.stderr.obs_add(self._pipe_event)
        if stdin == PIPE:
            self.stdin = InputPipeDispatcher(self.stdin, map=map, ignore_broken_pipe=True,
                    close_when_done=close_when_done, maxdata=bufsize)
            self.stdin.obs_add(self._pipe_event)

    def cancel_timeout(self, logger=None):
        if not self._alarmobj: return
        try:
            alarm.cancel(self._alarmobj)
        except:
            if logger: logger.debug("Error canceling child PID %d alarm" % child.pid, exc_info=1)
        finally:
            self._alarmobj = None

    def wait(self, logger=None):
        returncode = Popen.wait(self)
        self.cancel_timeout(logger=logger)
        return returncode

    @staticmethod
    def clear_signals(preexec_fn):
        """Wraps any preexec_fn in order to clear any signal handlers."""
        for s in range(1, signal.NSIG):
            try:
                if s not in [signal.SIGKILL, signal.SIGSTOP]: signal.signal(s, signal.SIG_DFL)
            except:
                pass
        if callable(preexec_fn): preexec_fn()

    def kill(self):
        """Kill the child process with extreme prejudice."""
        try:
            if self.returncode is None: os.kill(self.pid, signal.SIGKILL)
        finally:
            self.cancel_timeout()

    def fetch_output(self, clear=True):
        """Fetch data from the subprocess output pipes.

        An output file not set to a pipe returns an empty string.
        """
        outdata = self.stdout.fetch_data(clear) if self.stdout is not None else ''
        errdata = self.stderr.fetch_data(clear) if self.stderr is not None else ''
        return outdata, errdata

    def output_closed(self):
        """Return true if both subprocess output pipes are closed.

        Can be used to detected the termination of the subprocess. An output
        file not sent to a pipe is ignored.
        """
        outread = self.stdout.readable() if self.stdout is not None else False
        errread = self.stderr.readable() if self.stderr is not None else False
        return not (outread or errread)

    def _pipe_event(self, observed, event):
        """Forward events on the pipes. The forwarded events contain the pipe
        event and the pipe itself as a two-element tuple."""
        self._obs_notify((event, observed))


class SubprocessExecutor(Executor):
    """Executes subprocesses, reading and writing data using `asyncore`.

    For each subprocess to be created, the generator must yield either the
    object to be passed to the `argv` argument of the `Popen` constructor,
    or a dictionary containing a required `argv` key, an optional `input` key
    containing a string to be written to `stdin` of the subprocess, and keys
    corresponding to the keyword parameters to `AsyncPopen` (the same keywords
    as the `child_spawn()` method).

    Once the subprocess has exited, the executor will call `send()` on the
    generator, passing a 4-element tuple containing the data read from
    `stdout` and `stderr`, the exit status returned by `Popen.poll()`, and the
    pid of the subprocess. The generator can then yield the parameters for
    another subprocess.
    """
    def __init__(self, generator, exc_handler=print_exc, logger=None, **async_popen_keyw):
        """Initialize a subprocess executor.

        Additional keyword parameters to this constructor (usually passed
        through the decorator) will be passed to `AsyncPopen`.
        """
        Executor.__init__(self, generator, exc_handler)
        self._logger = logger
        self.__async_popen_dict = async_popen_keyw
        self.__current_child = None

    def _execute(self, logger=None, **async_popen_keyw):
        """Iterate the generator to completion (in the calling thread).

        The generator must yield the parameters for the first subprocess,
        which will be passed to `_spawn()`.

        Additional keyword parameters passed to this object when called will
        be passed to `AsyncPopen` (and override values passed to this object's
        constructor).
        """
        self.__async_popen_dict.update(async_popen_keyw)
        if logger is not None: self._logger = logger
        # Get the command to be executed from the generator
        self.__coerce_and_spawn(self.next())

    def _pipe_closed(self, observed, event):
        """Called when one of the output pipes (stdout or stderr) is closed.

        Once both are closed, declare the subprocess finished and call
        `_child_exit()`.
        """
        if observed.output_closed(): self._child_exit(observed)

    def _child_exit(self, child):
        """Called once `stdout` and `stderr` are both closed.

        Cleans up the subprocess, and then passes the subprocess results tom
        the generator by calling `send()`.  If the generator yields parameters
        for another subprocess, calls `_child_spawn()`.
        """
        self.__current_child = None
        # Close stdin for the child, so that it knows it won't be getting more data
        try:
            if child.stdin is not None: child.stdin.close()
        except:
            if self._logger: self._logger.debug("Error closing stdin for PID %d" % child.pid, exc_info=1)
        # Wait for the child if there's no signal handler
        if signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL:
            try:
                # This will cancel the alarm
                returncode = child.wait(logger=self._logger)
            except:
                if self._logger: self._logger.debug("Error waiting for child PID %d" % child.pid, exc_info=1)
                else: print_exc(file=sys.stderr)
        else:
            child.cancel_timeout(logger=self._logger)
            # This next will return None unless an exit status injector has been set up.
            returncode = child.poll()
        # Extract the result from the child process; and move on with the executor
        try:
            outdata, errdata = child.fetch_output()
            child_result = (outdata, errdata, returncode, child.pid)
            if self._logger: self._logger.debug("PID %d exited with code %s" % (child.pid, returncode))
            self.__coerce_and_spawn(self.send(child_result))
        except:
            self.throw(*sys.exc_info())

    def close(self):
        """Kill the subprocess when closing the generator."""
        child = self.__current_child
        if child:
            try:
                child.kill()
            except:
                if self._logger: self._logger.exception("Error killing child PID %d" % child.pid)
                else: print_exc(file=sys.stderr)
            else:
                self.__current_child = None
        Executor.close(self)

    def __coerce_and_spawn(self, arg):
        """Coerce the argument into a call to `_child_spawn()`"""
        try:
            self._child_spawn(**arg)
        except:
            self._child_spawn(argv=arg)

    def _child_spawn(self, argv=None, input=None, **async_popen_keyw):
        """Create the subprocess and send the data to the input pipe. Called
        with the value(s) yielded by the generator.

        If a subprocess is to be spawned, the `argv` keyword must be supplied
        with a non-empty value. The value passed to the `input` keyword will
        be written to `stdin` of the subprocess.

        Additional keyword parameters passed to this method will
        be passed to `AsyncPopen` (and override values passed to this object's
        constructor).
        """
        if self.stopped(): return
        # Merge the keyword arguments together to pass to AsyncPopen
        async_popen_dict = self.__async_popen_dict.copy()
        async_popen_dict.update(async_popen_keyw)
        if input: async_popen_dict["stdin"] = PIPE
        # Create the subprocess itself
        if self._logger: self._logger.debug("Spawning subprocess %s" % argv)
        self.__current_child = AsyncPopen(argv, **async_popen_dict)
        if self._logger: self._logger.debug("Spawned subprocess %s with PID %d" % (argv, self.__current_child.pid))
        # Listen for both output pipes to close, and push the data to stdin
        self.__current_child.obs_add(self._pipe_closed, criteria=PipeDispatcher.PIPE_CLOSED)
        if input: self.__current_child.stdin.push_data(str(input))


if __name__ == '__main__':
    def printdata(data, pid, channame):
        print '[%d] %s %d bytes received: %r' % (pid, channame, len(data), data)

    execq = ExecutionQueue()

    @execute(execq, SubprocessExecutor)
    def spawn_child(argv, data, child, loops):
        """Spawn a cascade of subprocesses."""
        for lp in range(1, loops + 1):
            (stdout, stderr, stat, pid) = yield {'argv': argv, 'input': '%s%s' % (data, '\n')}
            printdata(stdout, pid, 'stdout')
            printdata(stderr, pid, 'stderr')
            print "Loop %d child %d [%d] exited with status %s" % (lp, child, pid, stat)
            if stat == 0 and data == stdout.rstrip()[::-1]: data = stdout[:-1]

    def run_child(pause, exitstat):
        """Run the subprocess code; a simple string inverter."""
        line = sys.stdin.readline().strip()
        sleep(pause / 2.0)
        # Write and close both pipes to show that it waits for exit anyway.
        print line[::-1]
        print >>sys.stderr, line
        sys.stdout.close()
        sys.stderr.close()
        sleep(pause / 2.0)
        sys.exit(exitstat)

    optparser = optparse.OptionParser(usage=__usage__, version=__version__)
    optparser.disable_interspersed_args()
    optparser.add_option('--loops', type='int', metavar='N', default=3,
            help='Number of times to iterate each child [%default]')
    optparser.add_option('--children', type='int', metavar='N', default=3,
            help='Number of children to spawn [%default]')
    optparser.add_option('--timeout', type='float', metavar='SECONDS', default=10.0,
            help='Maximum time subprocess is allowed to run [%default sec]')
    optparser.add_option('--no-signal', dest='nosignal', action='store_true', default=False,
            help='Ignore signals from child processes.')
    childopts = optparse.OptionGroup(optparser, 'Child options')
    childopts.add_option('--child', action='store_true', help=optparse.SUPPRESS_HELP)
    childopts.add_option('--pause', type='float', metavar='SECONDS', default=2.0,
            help='Time to pause in the child process [%default sec]')
    childopts.add_option('--exitstat', type='int', metavar='STATUS', default=0,
            help='Child exit status [%default]')
    optparser.add_option_group(childopts)
    (options, args) = optparser.parse_args()

    if options.child:
        run_child(options.pause, options.exitstat)
    else:
        # Run the parent process code: start the first child and send data.
        if options.nosignal: signal.signal(signal.SIGCHLD, signal.SIG_IGN)
        sys.argv.insert(1, '--child')
        # Create and queue the children, and then loop asyncore
        data = ' '.join(args) if len(args) else digits
        for ch in range(1, options.children + 1):
            spawn_child(sys.argv, data, ch, options.loops)(timeout=options.timeout)
        loop()
        os.system('ps -ef')

Diff to Previous Revision

--- revision 20 2012-12-06 19:36:25
+++ revision 21 2013-01-21 19:51:00
@@ -8,22 +8,48 @@
 `SubprocessExecutor` class wraps `AsyncPopen` in an `Executor`, allowing
 inline subprocess execution using a generator.
 
-A warning about subprocess exit: when using `SubprocessExecutro`, one of the
-output files (`stdout` or `stderr`) must be a pipe. Because calling `wait()`
-on the subprocess would block, `SubprocessExecutor` checks for subprocess
-termination by waiting for `stdout` and `stderr` to close; this seems to work
-(at least for UNIX) even if `stdout` and `stderr` are closed by the
-subprocess. There is no guarantee that the subprocess has exited, but once
-`stdout` and `stderr` are closed, no more data is coming.
-
-A warning about attempting full-duplex communication with the subprocess: data
-that the subprocess writes might not be made available to the parent until the
-subprocess calls `flush()` or exits; thus, a parent which attempts to write
-data, read a response, and then write new data contingent on the response
-might find itself deadlocked. There seems to be no way for the parent process
-to force flushing of the subprocess output; changing the value of the
-`bufsize` parameter to `Popen()` to zero (or any other value) doesn't do it,
-and `asyncore.file_dispatcher` already sets `O_NONBLOCK` on the pipes.
+Full-duplex Communication:
+Data that the subprocess writes might not be made available to the parent until
+the subprocess calls `flush()` or exits; thus, a parent which attempts to write
+data, read a response, and then write new data contingent on the response might
+find itself deadlocked. There seems to be no way for the parent process to
+force flushing of the subprocess output; changing the value of the `bufsize`
+parameter to `Popen()` to zero (or any other value) doesn't do it, and
+`asyncore.file_dispatcher` already sets `O_NONBLOCK` on the pipes.
+
+Subprocess Exit:
+Detecting subprocess exit while avoiding zombie subprocesses can be tricky in
+asynchronous code. Calling `wait()` on a subprocess would block, leaving three
+alternatives for checking for subprocess exit:
+    1) Exit the asynchronous select loop (e.g. `asyncore.loop()`) occasionally
+to call `poll()` on any unterminated subprocesses. This requires maintaining a
+list of all unterminated subprocess objects, along with any context needed to
+handle the subprocess exit.
+    2) Set a handler for `SIGCHLD` which calls `os.waitpid(-1, os.WNOHANG)`,
+and then use the return value to locate the asynchronous process object and
+handle the subprocess exit. This must be done in a loop to avoid missing
+consolidated signals, requires maintaining a list of all unterminated
+subprocesses, and is limited by reentrancy restrictions on signal handlers.
+    3) Check for `stdout` and `stderr` to both be closed, which can be done as
+part of the asynchronous loop which reads data. This requires that at least one
+of `stdout` and `stderr` be a pipe, but an asynchronous subprocess is probably
+unnecessary in the first place if neither is a pipe. There is no absolute
+guarantee that the subprocess has exited when `stdout` and `stderr` have
+closed, but once they have, no more data is coming. However, because `wait()`
+is not being called on the subprocesses, special care has to be taken to avoid
+leaving zombie subproceses. There are again three alternatives:
+    a) Set `SIGCHLD` to `SIG_IGN`. This should work on most varieties of UNIX
+including Mac OS X. However, it prevents collecting the exit status of the
+subprocess; `poll()` will return `None` and `wait()` will raise an `OSError`
+exception.
+    b) Set a handler for `SIGCHLD` as in solution (2) above; if this is to be
+implemented, it may be better to simply implement solution (2) rather than
+waiting for the output pipes to close in the first place.
+    c) Call `wait()` on the subprocess after stdout and stderr are closed.
+While this will block (briefly), it should be reasonably safe unless the
+subprocess does something very unusual.
+    `SubprocessExecutor` waits for `stdout` and `stderr` to both be closed, and
+then calls `wait()` on the subprocess if no handler for `SIGCHLD` is set.
 
 References:
 http://code.activestate.com/recipes/577600/ [queued SIGALRM alarms]
@@ -51,7 +77,7 @@
     from time import sleep
     from worker import execute, ExecutionQueue
 
-__version__ = '$Revision: 3331 $'.split()[1]
+__version__ = '$Revision: 3414 $'.split()[1]
 
 __usage__ = 'usage: %prog [options] [data]'
 
@@ -98,18 +124,18 @@
                     close_when_done=close_when_done, maxdata=bufsize)
             self.stdin.obs_add(self._pipe_event)
 
-    def cancel_timeout(self):
+    def cancel_timeout(self, logger=None):
         if not self._alarmobj: return
         try:
             alarm.cancel(self._alarmobj)
         except:
-            pass
+            if logger: logger.debug("Error canceling child PID %d alarm" % child.pid, exc_info=1)
         finally:
             self._alarmobj = None
 
-    def wait(self):
+    def wait(self, logger=None):
         returncode = Popen.wait(self)
-        self.cancel_timeout()
+        self.cancel_timeout(logger=logger)
         return returncode
 
     @staticmethod
@@ -129,6 +155,25 @@
         finally:
             self.cancel_timeout()
 
+    def fetch_output(self, clear=True):
+        """Fetch data from the subprocess output pipes.
+
+        An output file not set to a pipe returns an empty string.
+        """
+        outdata = self.stdout.fetch_data(clear) if self.stdout is not None else ''
+        errdata = self.stderr.fetch_data(clear) if self.stderr is not None else ''
+        return outdata, errdata
+
+    def output_closed(self):
+        """Return true if both subprocess output pipes are closed.
+
+        Can be used to detected the termination of the subprocess. An output
+        file not sent to a pipe is ignored.
+        """
+        outread = self.stdout.readable() if self.stdout is not None else False
+        errread = self.stderr.readable() if self.stderr is not None else False
+        return not (outread or errread)
+
     def _pipe_event(self, observed, event):
         """Forward events on the pipes. The forwarded events contain the pipe
         event and the pipe itself as a two-element tuple."""
@@ -183,10 +228,7 @@
         Once both are closed, declare the subprocess finished and call
         `_child_exit()`.
         """
-        outread = observed.stdout.readable() if observed.stdout is not None else False
-        errread = observed.stderr.readable() if observed.stderr is not None else False
-        if self._logger: self._logger.debug("PID %d event %s: stdout %s stderr %s" % (observed.pid, event, outread, errread))
-        if not (outread or errread): self._child_exit(observed)
+        if observed.output_closed(): self._child_exit(observed)
 
     def _child_exit(self, child):
         """Called once `stdout` and `stderr` are both closed.
@@ -194,32 +236,28 @@
         Cleans up the subprocess, and then passes the subprocess results tom
         the generator by calling `send()`.  If the generator yields parameters
         for another subprocess, calls `_child_spawn()`.
-
-        At this point, stdout and stderr are closed, so we won't be getting
-        (or sending) any more data. However, we face a conundrum; we can't
-        `wait()` for the child, because that will stall the server if the
-        child is in fact frozen; not even in a separate thread, because that
-        deadlocks if we try to create a new `Popen()` child before the thread
-        exits (experimentally verified), and we shouldn't leave the alarm
-        running to kill the child later, because another process might reuse
-        the PID. So we make a best effort to clean up and move on.
         """
         self.__current_child = None
         # Close stdin for the child, so that it knows it won't be getting more data
         try:
             if child.stdin is not None: child.stdin.close()
         except:
-            if self._logger: self._logger.debug("Error closing stdin for PID %d" % child.pid)
-        # Kill the child directly just in case it's still running
-        try:
-            child.kill()
-        except:
-            if self._logger: self._logger.debug("Error killing child PID %d" % child.pid)
+            if self._logger: self._logger.debug("Error closing stdin for PID %d" % child.pid, exc_info=1)
+        # Wait for the child if there's no signal handler
+        if signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL:
+            try:
+                # This will cancel the alarm
+                returncode = child.wait(logger=self._logger)
+            except:
+                if self._logger: self._logger.debug("Error waiting for child PID %d" % child.pid, exc_info=1)
+                else: print_exc(file=sys.stderr)
+        else:
+            child.cancel_timeout(logger=self._logger)
+            # This next will return None unless an exit status injector has been set up.
+            returncode = child.poll()
         # Extract the result from the child process; and move on with the executor
         try:
-            returncode = child.poll()
-            outdata = child.stdout.fetch_data() if child.stdout is not None else ''
-            errdata = child.stderr.fetch_data() if child.stderr is not None else ''
+            outdata, errdata = child.fetch_output()
             child_result = (outdata, errdata, returncode, child.pid)
             if self._logger: self._logger.debug("PID %d exited with code %s" % (child.pid, returncode))
             self.__coerce_and_spawn(self.send(child_result))
@@ -288,7 +326,7 @@
             print "Loop %d child %d [%d] exited with status %s" % (lp, child, pid, stat)
             if stat == 0 and data == stdout.rstrip()[::-1]: data = stdout[:-1]
 
-    def run_child(pause=2.0, exitstat=0):
+    def run_child(pause, exitstat):
         """Run the subprocess code; a simple string inverter."""
         line = sys.stdin.readline().strip()
         sleep(pause / 2.0)
@@ -307,7 +345,9 @@
     optparser.add_option('--children', type='int', metavar='N', default=3,
             help='Number of children to spawn [%default]')
     optparser.add_option('--timeout', type='float', metavar='SECONDS', default=10.0,
-            help='Maximum time subprocess is allowed to run  [%default sec]')
+            help='Maximum time subprocess is allowed to run [%default sec]')
+    optparser.add_option('--no-signal', dest='nosignal', action='store_true', default=False,
+            help='Ignore signals from child processes.')
     childopts = optparse.OptionGroup(optparser, 'Child options')
     childopts.add_option('--child', action='store_true', help=optparse.SUPPRESS_HELP)
     childopts.add_option('--pause', type='float', metavar='SECONDS', default=2.0,
@@ -316,16 +356,16 @@
             help='Child exit status [%default]')
     optparser.add_option_group(childopts)
     (options, args) = optparser.parse_args()
-    # Return options as dictionary.
-    optdict = lambda *args: dict([(k, getattr(options, k)) for k in args])
 
     if options.child:
-        run_child(**optdict('pause','exitstat'))
+        run_child(options.pause, options.exitstat)
     else:
         # Run the parent process code: start the first child and send data.
+        if options.nosignal: signal.signal(signal.SIGCHLD, signal.SIG_IGN)
         sys.argv.insert(1, '--child')
         # Create and queue the children, and then loop asyncore
         data = ' '.join(args) if len(args) else digits
         for ch in range(1, options.children + 1):
             spawn_child(sys.argv, data, ch, options.loops)(timeout=options.timeout)
         loop()
+        os.system('ps -ef')

History