An indexed priority queue implemented in pure python as a dict-like class. It is a stripped-down version of pqdict. A Priority Queue Dictionary maps dictionary keys (dkeys) to updatable priority keys (pkeys).
The priority queue is implemented as a binary heap, which supports:
O(1) access to the top priority element
O(log n) removal of the top priority element
O(log n) insertion of a new element
In addition, an internal dictionary or "index" maps dictionary keys to the position of their entry in the heap. This index is maintained as the heap is manipulated. As a result, a PQ-dict also supports:
O(1) lookup of an arbitrary element's priority key
O(log n) removal of an arbitrary element
O(log n) updating of an arbitrary element's priority key
A data structure like this can be used to drive simulations, schedulers, certain greedy algorithms, merging streams of sorted data, and other applications where priorities may need to be changed frequently.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | from collections import MutableMapping
class _MinEntry(object):
"""
Mutable entries for a Min-PQ dictionary.
"""
def __init__(self, dkey, pkey):
self.dkey = dkey #dictionary key
self.pkey = pkey #priority key
def __lt__(self, other):
return self.pkey < other.pkey
class _MaxEntry(object):
"""
Mutable entries for a Max-PQ dictionary.
"""
def __init__(self, dkey, pkey):
self.dkey = dkey
self.pkey = pkey
def __lt__(self, other):
return self.pkey > other.pkey
class PQDict(MutableMapping):
def __init__(self, *args, **kwargs):
self._heap = []
self._position = {}
self.update(*args, **kwargs)
create_entry = _MinEntry #defaults to a min-pq
@classmethod
def maxpq(cls, *args, **kwargs):
pq = cls()
pq.create_entry = _MaxEntry
pq.__init__(*args, **kwargs)
return pq
def __len__(self):
return len(self._heap)
def __iter__(self):
for entry in self._heap:
yield entry.dkey
def __getitem__(self, dkey):
return self._heap[self._position[dkey]].pkey
def __setitem__(self, dkey, pkey):
heap = self._heap
position = self._position
try:
pos = position[dkey]
except KeyError:
# Add a new entry:
# put the new entry at the end and let it bubble up
pos = len(self._heap)
heap.append(self.create_entry(dkey, pkey))
position[dkey] = pos
self._swim(pos)
else:
# Update an existing entry:
# bubble up or down depending on pkeys of parent and children
heap[pos].pkey = pkey
parent_pos = (pos - 1) >> 1
child_pos = 2*pos + 1
if parent_pos > -1 and heap[pos] < heap[parent_pos]:
self._swim(pos)
elif child_pos < len(heap):
other_pos = child_pos + 1
if other_pos < len(heap) and not heap[child_pos] < heap[other_pos]:
child_pos = other_pos
if heap[child_pos] < heap[pos]:
self._sink(pos)
def __delitem__(self, dkey):
heap = self._heap
position = self._position
pos = position.pop(dkey)
entry_to_delete = heap[pos]
# Take the very last entry and place it in the vacated spot. Let it
# sink or swim until it reaches its new resting place.
end = heap.pop(-1)
if end is not entry_to_delete:
heap[pos] = end
position[end.dkey] = pos
parent_pos = (pos - 1) >> 1
child_pos = 2*pos + 1
if parent_pos > -1 and heap[pos] < heap[parent_pos]:
self._swim(pos)
elif child_pos < len(heap):
other_pos = child_pos + 1
if other_pos < len(heap) and not heap[child_pos] < heap[other_pos]:
child_pos = other_pos
if heap[child_pos] < heap[pos]:
self._sink(pos)
del entry_to_delete
def peek(self):
try:
entry = self._heap[0]
except IndexError:
raise KeyError
return entry.dkey, entry.pkey
def popitem(self):
heap = self._heap
position = self._position
try:
end = heap.pop(-1)
except IndexError:
raise KeyError
if heap:
entry = heap[0]
heap[0] = end
position[end.dkey] = 0
self._sink(0)
else:
entry = end
del position[entry.dkey]
return entry.dkey, entry.pkey
def iteritems(self):
# destructive heapsort iterator
try:
while True:
yield self.popitem()
except KeyError:
return
def _sink(self, top=0):
# "Sink-to-the-bottom-then-swim" algorithm (Floyd, 1964)
# Tends to reduce the number of comparisons when inserting "heavy" items
# at the top, e.g. during a heap pop
heap = self._heap
position = self._position
# Grab the top entry
pos = top
entry = heap[pos]
# Sift up a chain of child nodes
child_pos = 2*pos + 1
while child_pos < len(heap):
# choose the smaller child
other_pos = child_pos + 1
if other_pos < len(heap) and not heap[child_pos] < heap[other_pos]:
child_pos = other_pos
child_entry = heap[child_pos]
# move it up one level
heap[pos] = child_entry
position[child_entry.dkey] = pos
# next level
pos = child_pos
child_pos = 2*pos + 1
# We are left with a "vacant" leaf. Put our entry there and let it swim
# until it reaches its new resting place.
heap[pos] = entry
position[entry.dkey] = pos
self._swim(pos, top)
def _swim(self, pos, top=0):
heap = self._heap
position = self._position
# Grab the entry from its place
entry = heap[pos]
# Sift parents down until we find a place where the entry fits.
while pos > top:
parent_pos = (pos - 1) >> 1
parent_entry = heap[parent_pos]
if not entry < parent_entry:
break
heap[pos] = parent_entry
position[parent_entry.dkey] = pos
pos = parent_pos
# Put entry in its new place
heap[pos] = entry
position[entry.dkey] = pos
|
The reheapify algorithm implementations _sink()
and _swim()
are based on _siftup()
and _siftdown()
from the pure python version of the heapq
module. By default, items with smaller pkeys have higher priority.
Why include the "Entry" classes instead of just store items as, say, (dkey, pkey) tuples?
The reheapify algorithms use only the "<" comparator, making it easy to change the ranking behavior from a min- to a max-heap or some other custom ranking order just by overloading
__lt__
of the Entry class.If the items in the dictionary are to have changeable priority keys, it probably makes more sense to store them in the heap as something mutable. The caveat is that you'll get into trouble if you simply shallow copy a PQDict. You have to implement a copy method that copies the entry objects too.
A different approach would be for the reheapify algorithms to use a custom comparator function (returning -1, 0 or 1) to compare the dictionary's items, but I chose not to go down that route since cmp()
is deprecated and we are encouraged to overload rich comparison operators for this type of thing.
Note on iterators:
__iter__()
provides an iterator that returns items in arbitrary order, while iteritems()
provides a generator that actually pops items off the heap in sorted order. Similar "heapsort iterators" over the dkeys and pkeys can easily be included.
Improvements?
One would probably be to tidy up the "not a < b" notation by throwing in the other rich comparison methods with a mixin...
If you're looking for an API similar to that provided by a priority queue, check out the sortedcontainers module. It implements sorted list, sorted dict, and sorted set data types in pure-Python and is fast-as-C implementations (even faster!). Learn more about sortedcontainers, available on PyPI and github.