The winnow class uses a heap for finding the best few out of several items. At this it is quicker and shorter than python 2.3's heapq module, which is aimed at queuing rather than sifting. OTOH, it is unlikely to have any advantage over 2.4's heapq, which (I hear) has expanded functionality and is implemented in C.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
class winnow: """A heap structure allows you to easily keep track of one extreme of a set of values, at the expense of most other forms of order. The theory is explained in many places, including the heapq documentation (http://www.python.org/doc/2.3.4/lib/node162.html). The winnow heap is implemented as a fixed length list with the worst item at position 0. Candidates worse than item 0 are ignored; when one is better, item 0 is dropped and the worst remaining item is brought to the front. This happens in winnow.test. Because the length of its heap is unchanging and known in advance, the winnow can use shortcuts unavailable to the queue. You'll see that four arguments are needed for instance construction: def __init__(self, length, op, abject, target, seq=) Length is an integer indicating how many items you want to collect. Op is a comparison function, taking two arguments and returning true if the first argument is better than the second. To select the lowest, you would use probably use operator.lt. Abject is a value beneath consideration, something worse than all the values you want to consider. Target is abject's mirror, an unachievably good value. This sets up a precondition - you have to know just a little about your data. The heap is initialised thus: self.heap = [abject] * length + [target] which already meets the heap invariant, saving the trouble of heapification. As data is thrown against the heap, it will fill up with real values. Winnow.answer returns the heap with abject and target values stripped away. As syntax sugar, __call__ calls test and answer in turn. """ def __init__(self, length, op, abject, target, seq=): """Arguments: length - the number of items wanted. op - a comparison operator. op(a, b) should return true if a is better than b, and false if b is best. If a and b are equally good, then either true or false will work, with differing effects. abject - a value such that op(x, abject) will return false for any values of x that you want to catch. target - a value such that op(target, x) will return false for any value of x. seq - optional iterable to initialise the heap. """ self.abject = abject self.target = target self.op = op self.length = length self.heap = [abject] * length + [target] if seq: self.test(*seq) def test(self, *args): """This method examines each argument in turn. If one is more suitable than the worst in the heap, it displaces it and triggers some shuffling in the heap to bring the new worst item to the fore. """ for x in args: if self.op(x, self.heap): pos = 0 while True: child = 2 * pos + 1 if child >= self.length: break child += self.op(self.heap[child], self.heap[child + 1]) if not self.op(x, self.heap[child]): break self.heap[pos] = self.heap[child] pos = child self.heap[pos] = x def answer(self): """Returns a list containing the best results so far. If less than <self.length> values have been tested, the list will be of corresponding length.""" return [ x for x in self.heap[:-1] if x != self.abject ] def __call__(self, *args): """Both the above methods combined in a convenient syrup. >>> from operator import ge >>> top5 = winnow(5, ge, -1e+1000, 1e+1000) >>> top5(*[x*x for x in range(10)]) [25, 36, 64, 49, 81] >>> top5(45, 22, 777) [45, 49, 64, 777, 81] """ self.test(*args) return self.answer() import operator inf = 1e9999 def lowest(length, seq=, abject=inf, target=-inf): return winnow(length, operator.lt, abject, target, seq) def highest(length, seq=, abject=-inf, target=inf): return winnow(length, operator.gt, abject, target, seq) if __name__ == '__main__': #simpleminded tests r = range(50) r2 = r + r import random random.shuffle(r) random.shuffle(r2) low5 = lowest(5) print low5(*r) #should contain 0,1,2,3,4 in arbitrary order high12 = highest(12, r2) print high12() # should contain range(44,50) twice over, in arbitrary order. def cmp_3s(a, b): return str(a).count('3') > str(b).count('3') def most_3s(length, seq): return winnow(length, cmp_3s, '', '3'*50, seq) print most_3s(5, r2)() #should contain two 33s and three numbers with one 3
When there are many more candidates than available positions, the time taken usually approaches a linear relationship to the number of candidates, because most new candidates will be rejected at the first comparison. Put another way, it is O(n log k), where n is the number of candidates and k is the size of the elite. Performance is worst when each candidate is better than its predecessors -- the inner loop of winnow.test is exercised for every one.
The simplest way to achieve the same effect would be to sort and slice:
def simpleWinnow(k, p): p.sort() #p.reverse() #depending on which end you want. return p[:k]
This will be best for small, static populations, due to the speed of the builtin sort. But it won't scale well, nor elegantly cope with data that dribbles in slowly.
The operator function can return either true or false for equal values, with different but correct results in either case. The example below shows two case-insensitive winnows seeking the 3 lowest letters.
>>> le = winnow(3, lambda a,b: a.lower() <= b.lower(), 'z', ' ') >>> lt = winnow(3, lambda a,b: a.lower() < b.lower(), 'z', ' ') >>> for z in "abcCdcBA": ... print '%2s %20s %20s' % (z, lt(z), le(z)) ... a ['a'] ['a'] b ['a', 'b'] ['b', 'a'] c ['c', 'a', 'b'] ['c', 'b', 'a'] C ['c', 'a', 'b'] ['C', 'b', 'a'] d ['c', 'a', 'b'] ['C', 'b', 'a'] c ['c', 'a', 'b'] ['c', 'b', 'a'] B ['B', 'a', 'b'] ['b', 'B', 'a'] A ['b', 'a', 'A'] ['B', 'A', 'a']
The first column is the candidate letter, the second column is a winnow using a less-than function, and the third uses less-than-or-equal. Once both are filled up, in the 4th row, they meet a candidate equal to their worst. The lt winnow ignores it, while le absorbs it. Then again in row 6, le swaps two equal values. In row 7, both take in the B, as they should, but le propagates it down to the bottom of the heap, while lt leaves it at the top. Thus both comparisons work, but le does unnecessary work.
The target argument is of dubious benefit. All it does is save a bounds check in winnow.test. Without it, every inner cycle of the test method would need to check a boundary which is rarely exceeded. You would need this:
if child + 1 < self.length and \ self.op(self.heap[child], self.heap[child + 1]): child += 1
if self.op(self.heap[child], self.heap[child + 1]): child += 1
or, to be overly concise
child += self.op(self.heap[child], self.heap[child + 1])
abject is there to ease the creation of a fixed length heap in the absense of real values. It can also be used to set a threshold, outside of which values will be rejected out of hand. For instance
winnow(10, operator.gt, 7, 1e999)
will select the 10 highest numbers greater than 7. Which might be of use.