Simple tool for analyzing datasets.
Run the command below in your terminal to instantly set up a sandboxed dev environment with this recipe.
You can view the complete code in the github
repository for this recipe.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | from collections import namedtuple
from math import fsum
def map_reduce(data, mapper, reducer=None):
'''Simple map/reduce for data analysis.
Each data element is passed to a *mapper* function.
The mapper returns key/value pairs
or None for data elements to be skipped.
Returns a dict with the data grouped into lists.
If a *reducer* is specified, it aggregates each list.
>>> def even_odd(elem): # sample mapper
... if 10 <= elem <= 20: # skip elems outside the range
... key = elem % 2 # group into evens and odds
... return key, elem
>>> map_reduce(range(30), even_odd) # show group members
{0: [10, 12, 14, 16, 18, 20], 1: [11, 13, 15, 17, 19]}
>>> map_reduce(range(30), even_odd, sum) # sum each group
{0: 90, 1: 75}
'''
d = {}
for elem in data:
r = mapper(elem)
if r is not None:
key, value = r
if key in d:
d[key].append(value)
else:
d[key] = [value]
if reducer is not None:
for key, group in d.items():
d[key] = reducer(group)
return d
Summary = namedtuple('Summary', ['n', 'lo', 'mean', 'hi', 'std_dev'])
def describe(data):
'Simple reducer for descriptive statistics'
n = len(data)
lo = min(data)
hi = max(data)
mean = fsum(data) / n
std_dev = (fsum((x - mean) ** 2 for x in data) / n) ** 0.5
return Summary(n, lo, mean, hi, std_dev)
if __name__ == '__main__':
from pprint import pprint
import doctest
Person = namedtuple('Person', ['name', 'gender', 'age', 'height'])
persons = [
Person('mary', 'fem', 21, 60.2),
Person('suzy', 'fem', 32, 70.1),
Person('jane', 'fem', 27, 58.1),
Person('jill', 'fem', 24, 69.1),
Person('bess', 'fem', 43, 66.6),
Person('john', 'mal', 25, 70.8),
Person('jack', 'mal', 40, 59.1),
Person('mike', 'mal', 42, 60.3),
Person('zack', 'mal', 45, 63.7),
Person('alma', 'fem', 34, 67.0),
Person('bill', 'mal', 20, 62.1),
]
def height_by_gender_and_agegroup(p):
key = p.gender, p.age //10
val = p.height
return key, val
pprint(persons) # upgrouped dataset
pprint(map_reduce(persons, lambda p: ((p.gender, p.age//10), p))) # grouped people
pprint(map_reduce(persons, height_by_gender_and_agegroup, None)) # grouped heights
pprint(map_reduce(persons, height_by_gender_and_agegroup, len)) # size of each group
pprint(map_reduce(persons, height_by_gender_and_agegroup, describe)) # describe each group
print(doctest.testmod())
|
Provides minimal pivot-table and crosstab capabilities.
The recipe can also be implemented using collections.defaultdict(). The current implementation was chosen for clarity and to simplify the signature (user's expect map/reduce to return a regular dict). Another goal was to use dirt simple Python for the map_reduce() function.
The recipe uses math.fsum() instead of the builtin sum() to make sure precision isn't lost when averaging a large dataset full of nearly equal values.
Named tuples are used for code clarity but aren't essential to the map_reduce() recipe.
Have you tried to test this piece of code to make sure it actually runs?
Devy, thanks for the note. Replaced fsum() with sum(). The former was defined in my test script but not in the posted version.
Nice recipe - thanks for sharing it.
Is there reason to not use defaultdict?
The code seems a bit cleaner, and a rough test suggests a slight performance improvement.
I came looking for the use of separate processes or maybe threadsm but it seems this recipe is without any use of parallelism. Never mind.