map() applies a function to a list of data sequentially. This is a variation to map that execute each function call concurrently in a thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import threading
def concurrent_map(func, data):
"""
Similar to the bultin function map(). But spawn a thread for each argument
and apply `func` concurrently.
Note: unlike map(), we cannot take an iterable argument. `data` should be an
indexable sequence.
"""
N = len(data)
result = [None] * N
# wrapper to dispose the result in the right slot
def task_wrapper(i):
result[i] = func(data[i])
threads = [threading.Thread(target=task_wrapper, args=(i,)) for i in xrange(N)]
for t in threads:
t.start()
for t in threads:
t.join()
return result
|
The concurrent_map() makes it easy to parallelize some simple task, like fetching data from multiple web pages simultaneously. It uses the familiar map() interface to present a straight forward model to the caller.
The code below utilize the tool to fetch from multiple URLs. To exaggerate the effect, each URL is fetched 5 times. A substantial speed gain can be observed.
urls = [
'http://www.python.org/',
'http://www.w3.org/',
'http://creativecommons.org/',
'http://www.archive.org/',
'http://sourceforge.net/',
]
def fetch(url):
return urllib2.urlopen(url).read()
# repeat the test 5 times to make the effect apparent
urls = urls * 5
In [416]: %time concurrent_map(fetch, urls)
CPU times: user 0.85 s, sys: 0.00 s, total: 0.85 s
Wall time: 0.85 s
Out[417]:
In [418]: %time map(fetch, urls)
CPU times: user 6.59 s, sys: 0.00 s, total: 6.59 s
Wall time: 6.59 s
Out[419]:
Note, normal Python multi-threading issues applies. For example, GIL will prevent CPU intensive tasks from archiving speed gain, there is a limit to the number of threads a system can productively spawn, etc.
Why not just use multiprocessing.Pool.map?
Good to know there is a multiprocessing.Pool.map. At least I have picked the right API interface. I think people still want threading if the want to share big objects without copying it to each process.
You probably want to know, that your code still actual, because not all objects are pickable (for Pool.map). Thanks a lot!