Welcome, guest | Sign In | My Account | Store | Cart

Implementation of parallel map (processes).

This is a Unix only version.

Python, 52 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env python
'''Parallel map (Unix only)'''

__author__ = "Miki Tebeka <miki.tebeka@gmail.com>"

# The big advantage of this implemetation is that "fork" is very fast on
# copying data, so if you pass big arrays as arguments and return small values
# this is a win

# FIXME 
# * For too many items, we get "Too many open files"
# * Handle child exceptions

from os import fork, pipe, fdopen, waitpid, P_WAIT
from marshal import dump, load
from itertools import takewhile, count

def spawn(func, data):
    read_fo, write_fo = map(fdopen, pipe(), ("rb", "wb"))
    pid = fork()
    if pid: # Parent
        return pid, read_fo

    # Child
    dump(func(data), write_fo)
    write_fo.close()
    raise SystemExit

def wait(child):
    pid, fo = child
    waitpid(pid, P_WAIT)
    return load(fo)

def pmap(func, items):
    '''
    >>> pmap(lambda x: x * 2, range(5))
    [0, 2, 4, 6, 8]
    '''
    children = map(lambda item: spawn(func, item), items)
    return map(wait, children)

if __name__ == "__main__":
    def fib(n):
        a, b = 1, 1
        while n > 1:
            a, b = b, a + b
            n -= 1
        return b

    items = range(10)
    print "pmap(fib, %s)" % str(items)
    print pmap(fib, items)

The big advantage of this implementation is that "fork" is very fast on copying data, so if you pass big arrays as arguments and return small values this is a win.

Problems:

  • Currently if there are too many processes spawned, we get "Too many open files" error.
  • No way to catch exceptions on child process

2 comments

Agnius Vasiliauskas 14 years, 11 months ago  # | flag

You should use integrated multiprocessing package (unless you run Python < 2.6):

from multiprocessing import Pool
def f(x): return x*x
pool = Pool(processes=4)      # How much processes to start, should be equal to number of cores/processors
print pool.map(f, range(10))  # Parallel map

http://docs.python.org/dev/library/multiprocessing.html

Miki Tebeka (author) 11 years, 1 month ago  # | flag

As said in the comment, the advantage over multiprocessing is that "fork" copies data much faster. So if you're passing a lot of data to the child processes, this will be much faster.