Welcome, guest | Sign In | My Account | Store | Cart

Memory efficient multi-way iterator merge.

Python, 39 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import heapq

def imerge(*iterables):
    '''Merge multiple sorted inputs into a single sorted output.

    Equivalent to:  sorted(itertools.chain(*iterables))

    >>> list(imerge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
    [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]

    '''
    heappop, siftup, _StopIteration = heapq.heappop, heapq._siftup, StopIteration

    h = []
    h_append = h.append
    for it in map(iter, iterables):
        try:
            next = it.next
            h_append([next(), next])
        except _StopIteration:
            pass
    heapq.heapify(h)

    while 1:
        try:
            while 1:
                v, next = s = h[0]      # raises IndexError when h is empty
                yield v
                s[0] = next()           # raises StopIteration when exhausted
                siftup(h, 0)            # restore heap condition
        except _StopIteration:
            heappop(h)                  # remove empty iterator
        except IndexError:
            return


if __name__ == '__main__':
    import doctest
    doctest.testmod()

Improves on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/197530 by accepting a variable number of arguments, by using heapq for speed, and by using local variables instead of global or attribute access.

After each step, the value in the topmost heap element is updated in-place and the heap condition is restored with siftdown(). This avoids the expensive resizing and excess comparisons incurred by a heappop() followed by a heappush().

Suggested uses: 1) Merging log files based on (timestamp, entry) tuples 2) Merging sorted disk files too large to fit in memory all at once

5 comments

Calvin Spealman 17 years, 7 months ago  # | flag

Iterable Merge or Sequence Merge? This has the flaw that it assumes we aren't really merging iterables but sequences, or at least absolutely-lengthed, probably sorted already iterables. A really efficient imerge would not pull everything into a heapq before even the first iteration. what a hit if you just need the first!

That is why the original recipes, tho more complicated than need be, tried to look at the next values for each iterable, find which should be next, and yield it immediately. That is a large value in the use of iterators and the whole concept.

Anyway, if you do need to just sort sequences or non-infinite iterators, this is a lot cleaner and falls back on trusting the built-in sorted to be faster than anything else:

def merge(*iters):
    iters = list(iter(i) for i in iters)
    while iters:
        for i in iters:
            try:
                yield i.next()
            except StopIteration, e:
                iters.remove(i)

print sorted(merge([1,2,3,5], [4,5]))
Calvin Spealman 17 years, 7 months ago  # | flag

OOPS. forget i said anything. dang ASPN, where is the comment delete button?!

wang mm 17 years, 6 months ago  # | flag

improve for the privious version. def merge(*iters): myIters = (iter(i) for i in iters) for i in myIters: while True: try: yield i.next() except StopIteration, e: break print sorted(merge([1,2,3,5], [4,5]))

Vincent Kraeutler 17 years, 6 months ago  # | flag

why push v when we know there's no more there? it seems that rather than doing this:

while h:
    v, next = heappop(h)
    yield v
    try:
        v = next()
    except StopIteration:
        continue
    heappush(h, (v, next))

doing this might be slightly more efficient (i.e. you don't push iterators for iterables that have no more items back on the stack):

while h:
    v, next = heappop(h)
    yield v
    try:
        v = next()
        heappush(h, (v, next))
    except StopIteration:
        continue

not too familiar with the heapq module though ;-)

Raymond Hettinger (author) 17 years, 3 months ago  # | flag

v stops getting pushed when emtpy. The push step doesn't get reached when the preceding step raises StopIteration.