Performs a merge join on any number of iterators with sorted, unique keys. I.e., iterate through the data in parallel, outputting successive keys and all corresponding values from each iterator, or None if not available.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | """For data stored by a sorted key, perform a merge join over parallel
iteration through multiple data sources.
Author: Joel Nothman, me@joelnothman.com
Some sequences to join:
>>> fathers = [
... ('Abe', 'Jim'),
... ('Benny', 'John'),
... ('David', 'Jacob'),
... ('Evan', 'Jonas'),
... ]
>>> mothers = [
... ('Abe', 'Michelle'),
... ('Benny', 'Mary'),
... ('Caleb', 'Madeline'),
... ('Evan', 'Marna'),
... ]
>>> phones = [
... ('Benny', '000-000-0002'),
... ('David', '000-000-0004'),
... ]
We wish to retrieve row data combining each of these columns:
>>> for t in merge_join(fathers, mothers, phones):
... print t
('Abe', 'Jim', 'Michelle', None)
('Benny', 'John', 'Mary', '000-000-0002')
('Caleb', None, 'Madeline', None)
('David', 'Jacob', None, '000-000-0004')
('Evan', 'Jonas', 'Marna', None)
"""
def _first_iter_vals(iters):
"""Generate the first values of each iterator."""
for it in iters:
try:
yield it.next()
except StopIteration:
yield None
def _merge_join_next(iters, cur_pairs):
"""Generate the values of the next tuple (key, v1, ..., vn) by finding the
minimum key available, and returning the corresponding values where
available while advancing the respective iterators."""
# Find the next key, or quit if all keys are None
try:
min_key = min(p[0] for p in cur_pairs if p)
except ValueError:
return
# Yield the key as the first tuple element
yield min_key
for i, (it, p) in enumerate(zip(iters, cur_pairs)):
try:
k, v = p
except TypeError:
# p is None => the iterator has stopped
yield None
continue
if k != min_key:
# No data for this key
yield None
continue
# Yes data for this key: yield it
yield v
# Update cur_pairs for this iterator
try:
cur_pairs[i] = it.next()
except StopIteration:
cur_pairs[i] = None
def merge_join(*iters):
"""Given a series of n iterators whose data are of form ``(key, value)``,
where the keys are sorted and unique for each iterator, generates tuples
``(key, val_1, val_2, ..., val_n)`` for all keys, where ``val_i`` is the
value corresponding to ``key`` in the ``i``th iterator, or None if no such
pair exists for the ``i``th iterator."""
iters = [iter(it) for it in iters]
cur_pairs = list(_first_iter_vals(iters))
while True:
tup = tuple(_merge_join_next(iters, cur_pairs))
if not tup:
return
yield tup
if __name__ == "__main__":
import doctest
doctest.testmod()
|
This is useful for data stored by columns rather than rows such that some keys are not present in all columns. For example, a number of independent processes on a large collection of sorted items may produced a multiple files, each with the item's key and some processing result. While these output files are rarely all required at once, it is often useful to iterate through the combined data from all files for a given key.
Similar merges are available for the case of only two iterators: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/492216
Some preprocessing on iterators may also be useful: - if keys in some iterator it are not unique, use: itertools.groupby(it) - if tuples involved in the merge have more than one value for each key (as in the case in the linked recipe), use: ((t[0], t[1:]) for t in it) - if key value pairs for some iterator originate in a dict, use: sorted(mydict.iteritems()) - if keys can be generated from the data by a function, use: ((fn(d), d) for d in it)
I didn't know you could terminate a generator by raising an OutOfDataException - but I don't see the advantage of doing this over a simple "return" (which also ends the generator and doesn't create the exception object).
Edited accordingly. Thanks.
No, a return statement there would not be appropriate.
Intuitively, this algorithm returns a table. A return statement instead of an exception there would end the /row/ rather than the whole table.
But I guess it would be valid to return there and then test for an empty tuple.