Welcome, guest | Sign In | My Account | Store | Cart

Merge a (possibly infinite) number of already sorted inputs (each of possibly infinite length) into a single sorted output.

Similar to heapq.merge and sorted(itertools.chain(*iterables)).

Like heapq.merge, returns a generator, does not pull the data into memory all at once, and assumes that each of the input iterables is already sorted (smallest to largest).

Unlike heapq.merge, accepts an infinite number of input iterables, but requires all of them to come in ascending order (that is, their starting point must come in ascending order).

In addition, accepts a key function (like sorted, min, max, etc.)

Python, 131 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# mergeinf.py
# (C) 2010 Gabriel Genellina

from heapq import heappop, heapreplace, heapify
from operator import attrgetter

__all__ = ['imerge']

# 3.x compatibility
try:
    iter(()).next
except AttributeError:
    next_function_getter = attrgetter('__next__')
    class IterRecord(list):
      def __eq__(self, other): return self[0]==other[0]
      def __lt__(self, other): return self[0]<other[0]
      def __le__(self, other): return self[0]<=other[0]
      def __ne__(self, other): return self[0]!=other[0]
      def __gt__(self, other): return self[0]>other[0]
      def __ge__(self, other): return self[0]>=other[0]
else:    
    next_function_getter = attrgetter('next')
    IterRecord = list

    
def imerge(iterables, key=None):
    '''Merge a (possibly infinite) number of already sorted inputs 
    (each of possibly infinite length) into a single sorted output.

    Similar to heapq.merge and sorted(itertools.chain(*iterables)).

    Like heapq.merge, returns a generator, does not pull the data 
    into memory all at once, and assumes that each of the input 
    iterables is already sorted (smallest to largest).

    Unlike heapq.merge, accepts an infinite number of input iterables, 
    but requires all of them to come in ascending order (that is, 
    their starting point must come in ascending order).

    In addition, accepts a *key* function (like `sorted`, `min`, 
    `max`, etc.)
    
    >>> list(imerge([[1,3,5,7], [2,4,8], [5,10,15,20], [], [25]]))
    [1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]
    '''

    _heappop, _heapreplace, _heapify, _StopIteration = heappop, heapreplace, heapify, StopIteration
    _iter, _next, _len, _next_function_getter = iter, next, len, next_function_getter

    h = []
    h_append = h.append
    iterables = _iter(iterables)

    more_iterables = True
    while _len(h)<2:
        try:
            # raises StopIteration when no more iterables
            next_item = _next_function_getter(_iter(_next(iterables)))
        except _StopIteration:
            more_iterables = False
            break
        try:
            v = next_item()
        except _StopIteration:
            # ignore empty iterables
            continue
        if key is not None:
            highest = key(v)
        else:
            highest = v
        h_append(IterRecord([highest, v, next_item]))

    if _len(h) >= 2:
        # the heap invariant should hold, if input iterables come already sorted
        if h[1][0] < h[0][0]:
            raise ValueError('items out of order: %r and %r' % (h[0][0], h[1][0]))

    elif _len(h) == 1:
        # a single iterable, just send it
        assert not more_iterables
        _, v, next_item = h[0]
        yield v
        try:
            while True:
                yield next_item()
        except _StopIteration:
            return

    else:
        # empty
        return

    cur = highest
    while h:
        _, v, next_item = s = h[0]
        yield v

        try:
            v = s[1] = next_item()   # raises StopIteration when no more items
        except _StopIteration:
            _heappop(h)              # remove empty iterator
        else:
            if key is not None:
                cur = s[0] = key(v)
            else:
                cur = s[0] = v
            _heapreplace(h, s)       # restore heap condition

        # 'highest' is the highest known item in the heap.
        # Any time we advance an iterable and get an item ('cur')
        # greater than 'highest', we must bring more enough iterables
        # into play to ensure no items are missed.
        if more_iterables and (cur >= highest or _len(h) < 2):
            while cur >= highest or _len(h)<2:
                try:
                    # raises StopIteration when no more iterables
                    next_item = _next_function_getter(_iter(_next(iterables)))
                except _StopIteration:
                    more_iterables = False
                    break
                try:
                    v = next_item()
                except _StopIteration:
                    # ignore empty iterables
                    continue
                if key is not None:
                    highest = key(v)
                else:
                    highest = v
                h_append(IterRecord([highest, v, next_item]))
            _heapify(h)

This recipe is based on heapq.merge in the standard library, modified to be able to accept an infinite number of iterables.

heapq.merge requires all iterables to be already ordered; in addition, this recipe requires all iterables to come in ascending order (that is, their starting point must be in ascending order). This is important in order to be able to handle an infinite number of input streams. Input iterables start to be consumed only when needed, e.g.:

imerge([[0,5,10,15,20],
        xrange(10,40,2),
        xrange(30,40,3),
        [],
        [35,38,42]])

starts to look at the last iterable [35 ...] only after yielding the number 28 (taken from the second one).

Example: Enumerate a regular language (that is, generate all possible strings matching a given regular expression), so that shorter strings come before the longer ones.

If the regular expression contains unbounded repeaters (like *, + or {n,}) there will be infinite matches; we want the shorter ones first, or else we might never see them (think of 'a*|b'; we want 'b' to appear early, or it could be infinitely delayed after all those 'aaaaaaaaaaaaaaaa...').

Given '(a|bc)*', we can think of * as meaning '(a|bc){0}' (repeat zero times) merged with '(a|bc){1}' (repeat once) merged with '(a|bc){2}' merged with ...

'(a|bc){0}' = ''
'(a|bc){1}' = 'a', 'bc'
'(a|bc){2}' = 'aa', 'abc', 'bca', 'bcbc'
'(a|bc){3}' = 'aaa', 'aabc', 'abca', 'bcaa', 'abcbc', 'bcabc', 'bcbca', 'bcbcbc'
...

Merging all those (infinite) sequences gives:

'', 'a', 'aa', 'bc',
'aaa', 'abc', 'bca',
'aaaa', 'aabc', 'abca', 'bcaa', 'bcbc',
'aaaaa', 'aaabc', 'aabca', 'abcaa', 'abcbc', 'bcaaa', 'bcabc', 'bcbca',
...

The code to generate those strings (just a demo, real usage in another recipe):

from itertools import product, islice

def by_length(s):
    return (len(s), s)

def products(xs):
    current = ['']
    yield current
    while True:
        current = sorted((x+y for x,y in product(current, xs)), key=by_length)
        yield current

def closure(xs):
    return imerge(products(xs), key=by_length)

for x in closure(['a', 'bc']):
    print(x)

We can look at any part of the output sequence with:

for x in islice(closure(['a', 'bc']), 2000, 2010):
    print(x)

abcaaaabcabcabc
abcaaaabcabcbca
abcaaaabcbcaaaa
abcaaaabcbcaabc
...