ActiveState Code

Recipe 576709: Parallel map


Implementation of parallel map (processes).

This is a Unix only version.

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env python
'''Parallel map (Unix only)'''

__author__ = "Miki Tebeka <miki.tebeka@gmail.com>"

# The big advantage of this implemetation is that "fork" is very fast on
# copying data, so if you pass big arrays as arguments and return small values
# this is a win

# FIXME 
# * For too many items, we get "Too many open files"
# * Handle child exceptions

from os import fork, pipe, fdopen, waitpid, P_WAIT
from marshal import dump, load
from itertools import takewhile, count

def spawn(func, data):
    read_fo, write_fo = map(fdopen, pipe(), ("rb", "wb"))
    pid = fork()
    if pid: # Parent
        return pid, read_fo

    # Child
    dump(func(data), write_fo)
    write_fo.close()
    raise SystemExit

def wait(child):
    pid, fo = child
    waitpid(pid, P_WAIT)
    return load(fo)

def pmap(func, items):
    '''
    >>> pmap(lambda x: x * 2, range(5))
    [0, 2, 4, 6, 8]
    '''
    children = map(lambda item: spawn(func, item), items)
    return map(wait, children)

if __name__ == "__main__":
    def fib(n):
        a, b = 1, 1
        while n > 1:
            a, b = b, a + b
            n -= 1
        return b

    items = range(10)
    print "pmap(fib, %s)" % str(items)
    print pmap(fib, items)

Discussion

The big advantage of this implementation is that "fork" is very fast on copying data, so if you pass big arrays as arguments and return small values this is a win.

Problems:

  • Currently if there are too many processes spawned, we get "Too many open files" error.
  • No way to catch exceptions on child process

Comments

  1. 1. At 10:13 a.m. on 13 apr 2009, Agnius Vasiliauskas said:

    You should use integrated multiprocessing package (unless you run Python < 2.6):

    from multiprocessing import Pool
    def f(x): return x*x
    pool = Pool(processes=4)      # How much processes to start, should be equal to number of cores/processors
    print pool.map(f, range(10))  # Parallel map
    

    http://docs.python.org/dev/library/multiprocessing.html

Sign in to comment