Welcome, guest | Sign In | My Account | Store | Cart

map() applies a function to a list of data sequentially. This is a variation to map that execute each function call concurrently in a thread.

Python, 25 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

def concurrent_map(func, data):
    """
    Similar to the bultin function map(). But spawn a thread for each argument
    and apply `func` concurrently.

    Note: unlike map(), we cannot take an iterable argument. `data` should be an
    indexable sequence.
    """

    N = len(data)
    result = [None] * N

    # wrapper to dispose the result in the right slot
    def task_wrapper(i):
        result[i] = func(data[i])

    threads = [threading.Thread(target=task_wrapper, args=(i,)) for i in xrange(N)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    return result

The concurrent_map() makes it easy to parallelize some simple task, like fetching data from multiple web pages simultaneously. It uses the familiar map() interface to present a straight forward model to the caller.

The code below utilize the tool to fetch from multiple URLs. To exaggerate the effect, each URL is fetched 5 times. A substantial speed gain can be observed.

urls = [
    'http://www.python.org/',
    'http://www.w3.org/',
    'http://creativecommons.org/',
    'http://www.archive.org/',
    'http://sourceforge.net/',
]

def fetch(url):
    return urllib2.urlopen(url).read()

# repeat the test 5 times to make the effect apparent
urls = urls * 5

In [416]: %time concurrent_map(fetch, urls)
CPU times: user 0.85 s, sys: 0.00 s, total: 0.85 s
Wall time: 0.85 s
Out[417]:

In [418]: %time map(fetch, urls)
CPU times: user 6.59 s, sys: 0.00 s, total: 6.59 s
Wall time: 6.59 s
Out[419]:

Note, normal Python multi-threading issues applies. For example, GIL will prevent CPU intensive tasks from archiving speed gain, there is a limit to the number of threads a system can productively spawn, etc.

3 comments

Jacob Biesinger 13 years, 7 months ago  # | flag

Why not just use multiprocessing.Pool.map?

Wai Yip Tung (author) 13 years, 7 months ago  # | flag

Good to know there is a multiprocessing.Pool.map. At least I have picked the right API interface. I think people still want threading if the want to share big objects without copying it to each process.

Andrey 7 years, 2 months ago  # | flag

You probably want to know, that your code still actual, because not all objects are pickable (for Pool.map). Thanks a lot!