Welcome, guest | Sign In | My Account | Store | Cart

Performs a merge join on any number of iterators with sorted, unique keys. I.e., iterate through the data in parallel, outputting successive keys and all corresponding values from each iterator, or None if not available.

Python, 99 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""For data stored by a sorted key, perform a merge join over parallel
iteration through multiple data sources.

Author: Joel Nothman, me@joelnothman.com

Some sequences to join:
>>> fathers = [
...     ('Abe', 'Jim'),
...     ('Benny', 'John'),
...     ('David', 'Jacob'),
...     ('Evan', 'Jonas'),
... ]
>>> mothers = [
...     ('Abe', 'Michelle'),
...     ('Benny', 'Mary'),
...     ('Caleb', 'Madeline'),
...     ('Evan', 'Marna'),
... ]
>>> phones = [
...     ('Benny', '000-000-0002'),
...     ('David', '000-000-0004'),
... ]

We wish to retrieve row data combining each of these columns:

>>> for t in merge_join(fathers, mothers, phones):
...     print t
('Abe', 'Jim', 'Michelle', None)
('Benny', 'John', 'Mary', '000-000-0002')
('Caleb', None, 'Madeline', None)
('David', 'Jacob', None, '000-000-0004')
('Evan', 'Jonas', 'Marna', None)

"""

def _first_iter_vals(iters):
    """Generate the first values of each iterator."""
    for it in iters:
        try:
            yield it.next()
        except StopIteration:
            yield None


def _merge_join_next(iters, cur_pairs):
    """Generate the values of the next tuple (key, v1, ..., vn) by finding the
    minimum key available, and returning the corresponding values where
    available while advancing the respective iterators."""

    # Find the next key, or quit if all keys are None
    try:
        min_key = min(p[0] for p in cur_pairs if p)
    except ValueError:
        return

    # Yield the key as the first tuple element
    yield min_key

    for i, (it, p) in enumerate(zip(iters, cur_pairs)):
        try:
            k, v = p
        except TypeError:
            # p is None => the iterator has stopped
            yield None
            continue

        if k != min_key:
            # No data for this key
            yield None
            continue

        # Yes data for this key: yield it
        yield v

        # Update cur_pairs for this iterator
        try:
            cur_pairs[i] = it.next()
        except StopIteration:
            cur_pairs[i] = None


def merge_join(*iters):
    """Given a series of n iterators whose data are of form ``(key, value)``,
    where the keys are sorted and unique for each iterator, generates tuples
    ``(key, val_1, val_2, ..., val_n)`` for all keys, where ``val_i`` is the
    value corresponding to ``key`` in the ``i``th iterator, or None if no such
    pair exists for the ``i``th iterator."""

    iters = [iter(it) for it in iters]
    cur_pairs = list(_first_iter_vals(iters))
    while True:
        tup = tuple(_merge_join_next(iters, cur_pairs))
        if not tup:
            return
        yield tup

if __name__ == "__main__":
    import doctest
    doctest.testmod()

This is useful for data stored by columns rather than rows such that some keys are not present in all columns. For example, a number of independent processes on a large collection of sorted items may produced a multiple files, each with the item's key and some processing result. While these output files are rarely all required at once, it is often useful to iterate through the combined data from all files for a given key.

Similar merges are available for the case of only two iterators: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/492216

Some preprocessing on iterators may also be useful: - if keys in some iterator it are not unique, use: itertools.groupby(it) - if tuples involved in the merge have more than one value for each key (as in the case in the linked recipe), use: ((t[0], t[1:]) for t in it) - if key value pairs for some iterator originate in a dict, use: sorted(mydict.iteritems()) - if keys can be generated from the data by a function, use: ((fn(d), d) for d in it)

3 comments

Tom Ritchford 13 years, 4 months ago  # | flag

I didn't know you could terminate a generator by raising an OutOfDataException - but I don't see the advantage of doing this over a simple "return" (which also ends the generator and doesn't create the exception object).

Joel Nothman (author) 13 years, 1 month ago  # | flag

Edited accordingly. Thanks.

Joel Nothman (author) 13 years, 1 month ago  # | flag

No, a return statement there would not be appropriate.

Intuitively, this algorithm returns a table. A return statement instead of an exception there would end the /row/ rather than the whole table.

But I guess it would be valid to return there and then test for an empty tuple.

Created by Joel Nothman on Tue, 22 Jul 2008 (MIT)
Python recipes (4591)
Joel Nothman's recipes (1)

Required Modules

Other Information and Tasks