ActiveState Code

Recipe 81693: Heap-based priority queue


This is a Priority Queue based on a heap data strucuture. Elements come out of the queue least first. The heap is a complete binary tree with root at _a[1] and, for a node N, its parent is _a[N>>1] and children are _a[2N] and _a[2N+1].

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class PriQueue:
    """Least-first priority queue (elements must be comparable).

    To use another ordering, fix the three places marked # comparison.
    q=PriQueue(lst) Creates a priority queue with all entries in list
    i=len(q)      count of values in queue
    q.push(v)     Add entry to queue
    vb=q.cream(v) Add v to queue, and then remove and return the least entry.
    v=q.top()     view least entry in queue
    v=q.pop()     remove and return least entry in queue
    v=q.pops()    remove& return all entries in the queue (least first).
    v=q.pops(i)   remove&return the i least entries in the queue (least first).
    v=q.pops(-i)  remove&return all but the i greatest entries in the queue
                  (least first).
    """
    def __init__(self, data=[]):
        """create a new priority Queue (build heap structure)"""
        self._a = [None] * (1 + len(data))
        for i in range(1, len(self._a)):
            self._bubbleup(i, data[i-1])

    def __len__(self):
        """number of elements currently in the priority queue"""
        return len(self._a)-1

    def top(self):
        """view top item v w/o removing it"""
        return self._a[1]

    def pop(self):
        """pop v from PQ and return it."""
        a = self._a
        result = a[1]
        leaf = self._holetoleaf(1)
        if leaf != len(self):
            a[leaf] = a[-1]
            self._bubbleup(leaf, a[-1])
        self._a = a[:-1]
        return result

    def pops(self, maxRemove=None):
        """pop maxRemove elements from PQ and return a list of results

        If maxRemove < 0, all but maxRemove elements.  If >= 0,
        remove at most maxRemove elements (max len, of course).
        If None, remove all elements.  The result list is "best" first."""
        if maxRemove is None:
            maxRemove = len(self)
        elif maxRemove < 0:
            maxRemove += len(self)
        elif maxRemove > len(self):
            maxRemove = len(self)
        result = [None] * maxRemove
        for i in range(maxRemove):
            result[i] = self.pop()
        return result

    def push(self, v):
        """Add entry v to priority queue"""
        self._a.append(v)
        self._bubbleup(len(self), v)

    def pushes(self, lst):
        """Add all elements in lst to priority queue"""
        pos = len(self)
        self._a += lst
        for leaf in range(pos, len(self._a)):
            self._bubbleup(leaf, self._a[leaf])

    def cream(self, v):
        """Replace top entry in priority queue with v, then return best"""
        if 0 < len(self) and self._a[1] < v:  # comparison with v
            r = self._a[1]
            leaf = self._holetoleaf(1)
            self._bubbleup(leaf, v)
        else:
            r = v
        return r

    def _drop(self, node):
        """drop an entry from PQ."""
        leaf = self._holetoleaf(node)
        if leaf != len(self):
            self._a[leaf] = self._a[-1]
        del self._a[-1]

    def _holetoleaf(self, node):
        """Drive the empty cell at node to the leaf level. Return position"""
        limit = len(self)
        a = self._a
        kid = node * 2
        while kid < limit:
            if not a[kid] < a[kid + 1]:    # comparison
                kid += 1
            a[node] = a[kid]
            node = kid
            kid = node * 2
        if kid == limit:
            a[node] = a[kid]
            return kid
        return node

    def _bubbleup(self, node, v):
        """a[i] is as low as v need be.  Bubble it up."""
        a = self._a
        while node > 1:
            parent = node >> 1
            if not v < a[parent]:          # comparison
                break
            a[node] = a[parent]
            node = parent
        a[node] = v

    def _push(self, v, i):
        """Add entry v to priority queue replacing position i"""
        leaf = self._holetoleaf(i)
        self._bubbleup(leaf, v)

    def __repr__(self):
        return ''.join([self.__class__.__name__, '(', repr(self._a[1:]), ')'])

Discussion

A priority queue can be used as the heart of a time-based simulation. Tasks are added with the time to perform them; the top of the queue is always the next thing to do. The "cream" operation is for a very common simulation operation: an element is removed and replaced with a new priority. "Cream" saves the deallocate/allocate operations that come with such an operation.

In this code the whole entry is compared. Often you have a priority/value pair. In such cases (if priorities can be equal), you should change the comparison to look only at priorities. There are three places (each marked "# comparison") to do so. The test should be "strictly less than" (false on equal) for best performance.

"cream" can be used to get the "largest N" entries in a list as follows: pq = PriQueue(list[:N]) for v in list[N:]: pq.cream(v) largestN = pq.pops()

Don't ry to speed up the code by not going all of the way to a leaf: at least half of every binary tree is at a leaf, so you'll usually wind up there. If you slow down the comparisons getting to the leaf, you'll not recover the performance by not going as deep.

Sign in to comment