Welcome, guest | Sign In | My Account | Store | Cart

Memory efficient multi-way iterator merge, without using heapq

Python, 135 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def itermerge( *iters ):
    '''Merge multiple sorted inputs into a single sorted output.

    Equivalent to:  sorted(itertools.chain(*iterables))

    >>> list(imerge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
    [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]

    '''

    def merge( i1, i2 ):
        next1 = iter( i1 ).next
        next2 = iter( i2 ).next
        try:        
            v1 = next1()
        except StopIteration:
            while True:
                yield next2()
        try:
            v2 = next2()
        except StopIteration:
            while True:
                yield next1()
        while True:
            if v1 < v2:
                yield v1
                try:        
                    v1 = next1()
                except StopIteration:
                    yield v2
                    while True:
                        yield next2()
            else:
                yield v2
                try:
                    v2 = next2()
                except StopIteration:
                    yield v1
                    while True:
                        yield next1()
    iters_cnt = len( iters )
    if iters_cnt == 1:
        return iter( iters[0] )
    if iters_cnt == 2:
        return merge( iters[0], iters[1] )
    bisect = iters_cnt / 2
    return merge( itermerge( *iters[:bisect] ), itermerge( *iters[bisect:] ) )
if __name__ == '__main__':
    import doctest
    import itertools
    import heapq
    import random
    from timeit import Timer
    def test_data( dataset_cnt=5, sizerange=(100, 100) ):
        print ( "%s sets between %s and %s"
                    % ( dataset_cnt, sizerange[0], sizerange[1] ) )
        datasets = []
        for li in xrange( dataset_cnt ):
            d = []
            for i in xrange( int( random.uniform( *sizerange ) ) ):
                d.append( int( random.uniform( 0, 99 ) ) )
            d.sort()
            datasets.append( d )
        return tuple( datasets )

    def isort( *iters ):
        return iter( sorted( itertools.chain( *iters ) ) )


    #
    # From http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/491285
    #  for comparison
    #


    def imerge(*iterables):
        '''Merge multiple sorted inputs into a single sorted output.

        Equivalent to:  sorted(itertools.chain(*iterables))

        >>> list(imerge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
        [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]

        '''
        heappop, siftup, _StopIteration = heapq.heappop, heapq._siftup, StopIteration

        h = []
        h_append = h.append
        for it in map(iter, iterables):
            try:
                next = it.next
                h_append([next(), next])
            except _StopIteration:
                pass
        heapq.heapify(h)

        while 1:
            try:
                while 1:
                    v, next = s = h[0]      # raises IndexError when h is empty
                    yield v
                    s[0] = next()           # raises StopIteration when exhausted
                    siftup(h, 0)            # restore heap condition
            except _StopIteration:
                heappop(h)                  # remove empty iterator
            except IndexError:
                return
    #
    #  End
    #
    datasets = test_data( 10, ( 1, 10000 ) )
    verify_result = list( itertools.chain( *datasets ) )
    verify_result.sort()
    test_cnt = 3
    print "%s tests per timing run" % test_cnt

    def test( what ):
        name = what.__name__
        print name," "
        if list( what( *datasets ) ) == verify_result:
            print "Success!!"
        else:
            print "failure"
            return
        set_up = "from __main__ import %s, datasets; gc.enable()" % name
        test_it = "list( %s( *datasets ) )" % name
        print Timer( test_it, set_up ).timeit( test_cnt )

    test( isort )
    test( itermerge )
    test( imerge )

if __name__ == '__main__':
    import doctest
    doctest.testmod()

Inspired by http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/491285, itermerge has behavior Identical to imerge from that recipe, (from which I borrowed the doc string), but is generally faster.

itermerge returns an iterator that returns the sorted items for a sequence of iterators on sorted iterables. It creates a merge tree by calling iself recursively on bisections of the input set of iteratables. When the average length of the iterables is very short, between 2 and 3, imerge is faster. As the above recipe suggests this is best used for merging external sorted sequences too large to fit in memory. If the iterables fit, it it probably faster to use sort/sorted.

From above recipe. Suggested uses: 1) Merging log files based on (timestamp, entry) tuples 2) Merging sorted disk files too large to fit in memory all at once