Welcome, guest | Sign In | My Account | Store | Cart

An indexed priority queue implemented in pure python as a dict-like class. It is a stripped-down version of pqdict. A Priority Queue Dictionary maps dictionary keys (dkeys) to updatable priority keys (pkeys).

The priority queue is implemented as a binary heap, which supports:

  • O(1) access to the top priority element

  • O(log n) removal of the top priority element

  • O(log n) insertion of a new element

In addition, an internal dictionary or "index" maps dictionary keys to the position of their entry in the heap. This index is maintained as the heap is manipulated. As a result, a PQ-dict also supports:

  • O(1) lookup of an arbitrary element's priority key

  • O(log n) removal of an arbitrary element

  • O(log n) updating of an arbitrary element's priority key

A data structure like this can be used to drive simulations, schedulers, certain greedy algorithms, merging streams of sorted data, and other applications where priorities may need to be changed frequently.

Python, 185 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from collections import MutableMapping

class _MinEntry(object):
    """
    Mutable entries for a Min-PQ dictionary.

    """
    def __init__(self, dkey, pkey):
        self.dkey = dkey    #dictionary key
        self.pkey = pkey    #priority key

    def __lt__(self, other):
        return self.pkey < other.pkey

class _MaxEntry(object):
    """
    Mutable entries for a Max-PQ dictionary.

    """
    def __init__(self, dkey, pkey):
        self.dkey = dkey
        self.pkey = pkey

    def __lt__(self, other):
        return self.pkey > other.pkey

class PQDict(MutableMapping):
    def __init__(self, *args, **kwargs):
        self._heap = []
        self._position = {}
        self.update(*args, **kwargs)

    create_entry = _MinEntry     #defaults to a min-pq
    @classmethod
    def maxpq(cls, *args, **kwargs):
        pq = cls()
        pq.create_entry = _MaxEntry
        pq.__init__(*args, **kwargs)
        return pq

    def __len__(self):
        return len(self._heap)

    def __iter__(self):
        for entry in self._heap:
            yield entry.dkey

    def __getitem__(self, dkey):
        return self._heap[self._position[dkey]].pkey

    def __setitem__(self, dkey, pkey):
        heap = self._heap
        position = self._position

        try:
            pos = position[dkey]
        except KeyError:
            # Add a new entry:
            # put the new entry at the end and let it bubble up
            pos = len(self._heap)
            heap.append(self.create_entry(dkey, pkey))
            position[dkey] = pos
            self._swim(pos)
        else:
            # Update an existing entry:
            # bubble up or down depending on pkeys of parent and children
            heap[pos].pkey = pkey
            parent_pos = (pos - 1) >> 1
            child_pos = 2*pos + 1
            if parent_pos > -1 and heap[pos] < heap[parent_pos]:
                self._swim(pos)
            elif child_pos < len(heap):
                other_pos = child_pos + 1
                if other_pos < len(heap) and not heap[child_pos] < heap[other_pos]:
                    child_pos = other_pos
                if heap[child_pos] < heap[pos]:
                    self._sink(pos)

    def __delitem__(self, dkey):
        heap = self._heap
        position = self._position

        pos = position.pop(dkey)
        entry_to_delete = heap[pos]

        # Take the very last entry and place it in the vacated spot. Let it
        # sink or swim until it reaches its new resting place.
        end = heap.pop(-1)
        if end is not entry_to_delete:
            heap[pos] = end
            position[end.dkey] = pos
            parent_pos = (pos - 1) >> 1
            child_pos = 2*pos + 1
            if parent_pos > -1 and heap[pos] < heap[parent_pos]:
                self._swim(pos)
            elif child_pos < len(heap):
                other_pos = child_pos + 1
                if other_pos < len(heap) and not heap[child_pos] < heap[other_pos]:
                    child_pos = other_pos
                if heap[child_pos] < heap[pos]:
                    self._sink(pos)
        del entry_to_delete

    def peek(self):
        try:
            entry = self._heap[0]
        except IndexError:
            raise KeyError
        return entry.dkey, entry.pkey

    def popitem(self):
        heap = self._heap
        position = self._position

        try:
            end = heap.pop(-1)
        except IndexError:
            raise KeyError

        if heap:
            entry = heap[0]
            heap[0] = end
            position[end.dkey] = 0
            self._sink(0)
        else:
            entry = end
        del position[entry.dkey]
        return entry.dkey, entry.pkey

    def iteritems(self):    
        # destructive heapsort iterator
        try:
            while True:
                yield self.popitem()
        except KeyError:
            return

    def _sink(self, top=0):
        # "Sink-to-the-bottom-then-swim" algorithm (Floyd, 1964)
        # Tends to reduce the number of comparisons when inserting "heavy" items
        # at the top, e.g. during a heap pop
        heap = self._heap
        position = self._position

        # Grab the top entry
        pos = top
        entry = heap[pos]
        # Sift up a chain of child nodes
        child_pos = 2*pos + 1
        while child_pos < len(heap):
            # choose the smaller child
            other_pos = child_pos + 1
            if other_pos < len(heap) and not heap[child_pos] < heap[other_pos]:
                child_pos = other_pos
            child_entry = heap[child_pos]
            # move it up one level
            heap[pos] = child_entry
            position[child_entry.dkey] = pos
            # next level
            pos = child_pos
            child_pos = 2*pos + 1
        # We are left with a "vacant" leaf. Put our entry there and let it swim 
        # until it reaches its new resting place.
        heap[pos] = entry
        position[entry.dkey] = pos
        self._swim(pos, top)

    def _swim(self, pos, top=0):
        heap = self._heap
        position = self._position

        # Grab the entry from its place
        entry = heap[pos]
        # Sift parents down until we find a place where the entry fits.
        while pos > top:
            parent_pos = (pos - 1) >> 1
            parent_entry = heap[parent_pos]
            if not entry < parent_entry:
                break
            heap[pos] = parent_entry
            position[parent_entry.dkey] = pos
            pos = parent_pos
        # Put entry in its new place
        heap[pos] = entry
        position[entry.dkey] = pos

The reheapify algorithm implementations _sink() and _swim() are based on _siftup() and _siftdown() from the pure python version of the heapq module. By default, items with smaller pkeys have higher priority.

Why include the "Entry" classes instead of just store items as, say, (dkey, pkey) tuples?

  1. The reheapify algorithms use only the "<" comparator, making it easy to change the ranking behavior from a min- to a max-heap or some other custom ranking order just by overloading __lt__ of the Entry class.

  2. If the items in the dictionary are to have changeable priority keys, it probably makes more sense to store them in the heap as something mutable. The caveat is that you'll get into trouble if you simply shallow copy a PQDict. You have to implement a copy method that copies the entry objects too.

A different approach would be for the reheapify algorithms to use a custom comparator function (returning -1, 0 or 1) to compare the dictionary's items, but I chose not to go down that route since cmp() is deprecated and we are encouraged to overload rich comparison operators for this type of thing.

Note on iterators:

__iter__() provides an iterator that returns items in arbitrary order, while iteritems() provides a generator that actually pops items off the heap in sorted order. Similar "heapsort iterators" over the dkeys and pkeys can easily be included.

Improvements?

One would probably be to tidy up the "not a < b" notation by throwing in the other rich comparison methods with a mixin...

1 comment

Grant Jenks 9 years, 7 months ago  # | flag

If you're looking for an API similar to that provided by a priority queue, check out the sortedcontainers module. It implements sorted list, sorted dict, and sorted set data types in pure-Python and is fast-as-C implementations (even faster!). Learn more about sortedcontainers, available on PyPI and github.