A procedure that runs a function asynchronously in a forked process (Availability: Macintosh, Unix). The return from the specified function is written into an anonymous memory map (mmap: requires +Python 2.5). This can be useful for releasing resources used by the function such as memory, updating a gui or cli widget, or other weirdness.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | #!/usr/bin/env python
import sys, os, cPickle, time, mmap
TICKS = ('|', '/', '\\')
ESC = chr(27)
class Busy(object):
    def __init__(self, pid):
        self.tick = -1
        sys.stdout.write('Running Child(pid:%d)...%s[s' % (pid, ESC))
        self.next()
    def next(self):
        self.tick += 1
        sys.stdout.write('%s[K %s%s[u' % (ESC, TICKS[self.tick%3], ESC))
        sys.stdout.flush()
    def stop(self, status):
        sys.stdout.write('Done(status: %d)\n' % status)
class ForkedProcessException(Exception):
    pass
def run_in_separate_process(waitclass, func, *args, **kwds):
    try:
        mmsize = kwds['mmsize']
        del kwds['mmsize']
        mmsize = max(mmsize, 1024)
    except KeyError:
        mmsize = 1024
    mm = mmap.mmap(-1, mmsize) 
    pid = os.fork()
    if pid != 0:
        # the parent process
        busy = waitclass(pid)
        try:
            while 1:
                busy.next()
                wpid, wstatus = os.waitpid(pid, os.WNOHANG)
                if wpid == pid:
                    break
        except KeyboardInterrupt:
            raise ForkedProcessException('User cancelled!')
        if os.WIFEXITED(wstatus):
            status = os.WEXITSTATUS(wstatus)
            busy.stop(status)
        elif os.WIFSIGNALED(wstatus):
            raise ForkedProcessException('Child killed by signal: %d' % os.WTERMSIG(wstatus))
        else:
            raise RuntimeError('Unknown child exit status!')
        mm.seek(0)
        result = cPickle.load(mm)
        if status  == 0:
            return result
        else:
            raise result
    else: # the child process 
        try:
            mm.seek(0)
            result = func(*args, **kwds)
            status = 0 # success
            cPickle.dump(result, mm, cPickle.HIGHEST_PROTOCOL)
        except cPickle.PicklingError, exc:
            status = 2 # failure
            cPickle.dump(exc, mm, cPickle.HIGHEST_PROTOCOL)
        except (KeyboardInterrupt), exc:
            status = 4 # failure
            cPickle.dump(ForkedProcessException('User cancelled!'), mm, cPickle.HIGHEST_PROTOCOL)
        except ValueError:
            status = 3 # failure
            pstr = cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL)
            mm.seek(0)
            cPickle.dump(ForkedProcessException('mmsize: %d, need: %d' % (mmsize, len(pstr))), mm, cPickle.HIGHEST_PROTOCOL)
        except (Exception), exc:
            status = 1 # failure
            cPickle.dump(exc, mm, cPickle.HIGHEST_PROTOCOL)
        os._exit(status)
# Functions to run in a separate process
def treble(x, fail=False):
    if fail: 1/0
    return 3 * x
def suicide():
    os.kill(os.getpid(), 15)
def toobig():
    return '1234567890' * 110
def nocanpickle():
    return globals()
def waitaround(seconds=3, fail=False):
    while seconds:
        if fail: 1/0
        time.sleep(1)
        seconds -= 1
    return ['here', 'is', 'the', 'dead', 'tree', 'devoid', 'of', 'leaves']
def sysexit():
    sys.exit(9)
# General test function call
def run(direct, func, *args, **kwargs):
    try:
        print '\nRunning %s(%s, %s) ' % (func.func_name, args, kwargs),
        if direct:
            print 'directly...' 
            result = func(*args, **kwargs)
            print 'Needs minimum mmsize of %d' % (len(cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL)))
        else:
            print 'in separate process...'
            result = run_in_separate_process(Busy, func, *args, **kwargs)
        print '%s returned: %s' % (func.func_name, result)
    except Exception, e:
        print '%s raised %s: %s' % (func.func_name, e.__class__.__name__, str(e))
def main():
    direct = True
    run(not direct, waitaround, seconds=30)
    run(not direct, waitaround)
    run(not direct, waitaround, fail=True)
    run(not direct, toobig)
    run(not direct, nocanpickle)
    run(not direct, suicide)
    run(direct, waitaround, seconds=5)
    run(not direct, sysexit)
    run(not direct, treble, 4)
    run(direct, treble, 4)
if __name__ == '__main__':
    main()
 | 
See Muhammad Alkarouri's version (http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474) for the rationale. This recipe purports to do the same thing, but without blocking. Using a pipe is simpler, but it blocks. An anonymous memory map (i.e., a memory map with no underlying file) is used instead, but as far as I know, cannot be resized when the child process finds out the size of the pickled result. If the map is too small, it fails (status 3). A map size can be specified by the keyword parameter mmsize. This version handles killing the child process via signal, by raising an exception. The KeyboardInterrupt business should handle forced termination of the target function, but a pid-specific kill is preferable.

 Download
Download Copy to clipboard
Copy to clipboard