Welcome, guest | Sign In | My Account | Store | Cart

A procedure that runs a function asynchronously in a forked process (Availability: Macintosh, Unix). The return from the specified function is written into an anonymous memory map (mmap: requires +Python 2.5). This can be useful for releasing resources used by the function such as memory, updating a gui or cli widget, or other weirdness.

Python, 130 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python

import sys, os, cPickle, time, mmap

TICKS = ('|', '/', '\\')
ESC = chr(27)

class Busy(object):

    def __init__(self, pid):
        self.tick = -1
        sys.stdout.write('Running Child(pid:%d)...%s[s' % (pid, ESC))
        self.next()

    def next(self):
        self.tick += 1
        sys.stdout.write('%s[K %s%s[u' % (ESC, TICKS[self.tick%3], ESC))
        sys.stdout.flush()

    def stop(self, status):
        sys.stdout.write('Done(status: %d)\n' % status)

class ForkedProcessException(Exception):
    pass


def run_in_separate_process(waitclass, func, *args, **kwds):
    try:
        mmsize = kwds['mmsize']
        del kwds['mmsize']
        mmsize = max(mmsize, 1024)
    except KeyError:
        mmsize = 1024
    mm = mmap.mmap(-1, mmsize) 
    pid = os.fork()
    if pid != 0:
        # the parent process
        busy = waitclass(pid)
        try:
            while 1:
                busy.next()
                wpid, wstatus = os.waitpid(pid, os.WNOHANG)
                if wpid == pid:
                    break
        except KeyboardInterrupt:
            raise ForkedProcessException('User cancelled!')
        if os.WIFEXITED(wstatus):
            status = os.WEXITSTATUS(wstatus)
            busy.stop(status)
        elif os.WIFSIGNALED(wstatus):
            raise ForkedProcessException('Child killed by signal: %d' % os.WTERMSIG(wstatus))
        else:
            raise RuntimeError('Unknown child exit status!')
        mm.seek(0)
        result = cPickle.load(mm)
        if status  == 0:
            return result
        else:
            raise result
    else: # the child process 
        try:
            mm.seek(0)
            result = func(*args, **kwds)
            status = 0 # success
            cPickle.dump(result, mm, cPickle.HIGHEST_PROTOCOL)
        except cPickle.PicklingError, exc:
            status = 2 # failure
            cPickle.dump(exc, mm, cPickle.HIGHEST_PROTOCOL)
        except (KeyboardInterrupt), exc:
            status = 4 # failure
            cPickle.dump(ForkedProcessException('User cancelled!'), mm, cPickle.HIGHEST_PROTOCOL)
        except ValueError:
            status = 3 # failure
            pstr = cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL)
            mm.seek(0)
            cPickle.dump(ForkedProcessException('mmsize: %d, need: %d' % (mmsize, len(pstr))), mm, cPickle.HIGHEST_PROTOCOL)
        except (Exception), exc:
            status = 1 # failure
            cPickle.dump(exc, mm, cPickle.HIGHEST_PROTOCOL)
        os._exit(status)

# Functions to run in a separate process
def treble(x, fail=False):
    if fail: 1/0
    return 3 * x
def suicide():
    os.kill(os.getpid(), 15)
def toobig():
    return '1234567890' * 110
def nocanpickle():
    return globals()
def waitaround(seconds=3, fail=False):
    while seconds:
        if fail: 1/0
        time.sleep(1)
        seconds -= 1
    return ['here', 'is', 'the', 'dead', 'tree', 'devoid', 'of', 'leaves']
def sysexit():
    sys.exit(9)

# General test function call
def run(direct, func, *args, **kwargs):
    try:
        print '\nRunning %s(%s, %s) ' % (func.func_name, args, kwargs),
        if direct:
            print 'directly...' 
            result = func(*args, **kwargs)
            print 'Needs minimum mmsize of %d' % (len(cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL)))
        else:
            print 'in separate process...'
            result = run_in_separate_process(Busy, func, *args, **kwargs)
        print '%s returned: %s' % (func.func_name, result)
    except Exception, e:
        print '%s raised %s: %s' % (func.func_name, e.__class__.__name__, str(e))

def main():
    direct = True
    run(not direct, waitaround, seconds=30)
    run(not direct, waitaround)
    run(not direct, waitaround, fail=True)
    run(not direct, toobig)
    run(not direct, nocanpickle)
    run(not direct, suicide)
    run(direct, waitaround, seconds=5)
    run(not direct, sysexit)
    run(not direct, treble, 4)
    run(direct, treble, 4)

if __name__ == '__main__':
    main()

See Muhammad Alkarouri's version (http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474) for the rationale. This recipe purports to do the same thing, but without blocking. Using a pipe is simpler, but it blocks. An anonymous memory map (i.e., a memory map with no underlying file) is used instead, but as far as I know, cannot be resized when the child process finds out the size of the pickled result. If the map is too small, it fails (status 3). A map size can be specified by the keyword parameter mmsize. This version handles killing the child process via signal, by raising an exception. The KeyboardInterrupt business should handle forced termination of the target function, but a pid-specific kill is preferable.