The winnow class uses a heap for finding the best few out of several items. At this it is quicker and shorter than python 2.3's heapq module, which is aimed at queuing rather than sifting. OTOH, it is unlikely to have any advantage over 2.4's heapq, which (I hear) has expanded functionality and is implemented in C.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | class winnow:
"""A heap structure allows you to easily keep track of one extreme of a
set of values, at the expense of most other forms of order. The
theory is explained in many places, including the heapq documentation
(http://www.python.org/doc/2.3.4/lib/node162.html).
The winnow heap is implemented as a fixed length list with the worst
item at position 0. Candidates worse than item 0 are ignored; when
one is better, item 0 is dropped and the worst remaining item is brought
to the front. This happens in winnow.test.
Because the length of its heap is unchanging and known in advance, the
winnow can use shortcuts unavailable to the queue. You'll see that four
arguments are needed for instance construction:
def __init__(self, length, op, abject, target, seq=[])
Length is an integer indicating how many items you want to collect.
Op is a comparison function, taking two arguments and returning true if
the first argument is better than the second. To select the lowest, you
would use probably use operator.lt. Abject is a value beneath
consideration, something worse than all the values you want to consider.
Target is abject's mirror, an unachievably good value. This sets up a
precondition - you have to know just a little about your data.
The heap is initialised thus:
self.heap = [abject] * length + [target]
which already meets the heap invariant, saving the trouble of
heapification. As data is thrown against the heap, it will fill up with
real values.
Winnow.answer returns the heap with abject and target values stripped
away. As syntax sugar, __call__ calls test and answer in turn.
"""
def __init__(self, length, op, abject, target, seq=[]):
"""Arguments:
length - the number of items wanted.
op - a comparison operator. op(a, b) should return true if a is
better than b, and false if b is best. If a and b are equally good,
then either true or false will work, with differing effects.
abject - a value such that op(x, abject) will return false for any
values of x that you want to catch.
target - a value such that op(target, x) will return false for
any value of x.
seq - optional iterable to initialise the heap.
"""
self.abject = abject
self.target = target
self.op = op
self.length = length
self.heap = [abject] * length + [target]
if seq:
self.test(*seq)
def test(self, *args):
"""This method examines each argument in turn. If one is more
suitable than the worst in the heap, it displaces it and triggers
some shuffling in the heap to bring the new worst item to the fore.
"""
for x in args:
if self.op(x, self.heap[0]):
pos = 0
while True:
child = 2 * pos + 1
if child >= self.length:
break
child += self.op(self.heap[child], self.heap[child + 1])
if not self.op(x, self.heap[child]):
break
self.heap[pos] = self.heap[child]
pos = child
self.heap[pos] = x
def answer(self):
"""Returns a list containing the best results so far. If less
than <self.length> values have been tested, the list will be
of corresponding length."""
return [ x for x in self.heap[:-1] if x != self.abject ]
def __call__(self, *args):
"""Both the above methods combined in a convenient syrup.
>>> from operator import ge
>>> top5 = winnow(5, ge, -1e+1000, 1e+1000)
>>> top5(*[x*x for x in range(10)])
[25, 36, 64, 49, 81]
>>> top5(45, 22, 777)
[45, 49, 64, 777, 81]
"""
self.test(*args)
return self.answer()
import operator
inf = 1e9999
def lowest(length, seq=[], abject=inf, target=-inf):
return winnow(length, operator.lt, abject, target, seq)
def highest(length, seq=[], abject=-inf, target=inf):
return winnow(length, operator.gt, abject, target, seq)
if __name__ == '__main__':
#simpleminded tests
r = range(50)
r2 = r + r
import random
random.shuffle(r)
random.shuffle(r2)
low5 = lowest(5)
print low5(*r)
#should contain 0,1,2,3,4 in arbitrary order
high12 = highest(12, r2)
print high12()
# should contain range(44,50) twice over, in arbitrary order.
def cmp_3s(a, b):
return str(a).count('3') > str(b).count('3')
def most_3s(length, seq):
return winnow(length, cmp_3s, '', '3'*50, seq)
print most_3s(5, r2)()
#should contain two 33s and three numbers with one 3
|
When there are many more candidates than available positions, the time taken usually approaches a linear relationship to the number of candidates, because most new candidates will be rejected at the first comparison. Put another way, it is O(n log k), where n is the number of candidates and k is the size of the elite. Performance is worst when each candidate is better than its predecessors -- the inner loop of winnow.test is exercised for every one.
The simplest way to achieve the same effect would be to sort and slice:
def simpleWinnow(k, p):
p.sort()
#p.reverse() #depending on which end you want.
return p[:k]
This will be best for small, static populations, due to the speed of the builtin sort. But it won't scale well, nor elegantly cope with data that dribbles in slowly.
The operator function can return either true or false for equal values, with different but correct results in either case. The example below shows two case-insensitive winnows seeking the 3 lowest letters.
>>> le = winnow(3, lambda a,b: a.lower() <= b.lower(), 'z', ' ')
>>> lt = winnow(3, lambda a,b: a.lower() < b.lower(), 'z', ' ')
>>> for z in "abcCdcBA":
... print '%2s %20s %20s' % (z, lt(z), le(z))
...
a ['a'] ['a']
b ['a', 'b'] ['b', 'a']
c ['c', 'a', 'b'] ['c', 'b', 'a']
C ['c', 'a', 'b'] ['C', 'b', 'a']
d ['c', 'a', 'b'] ['C', 'b', 'a']
c ['c', 'a', 'b'] ['c', 'b', 'a']
B ['B', 'a', 'b'] ['b', 'B', 'a']
A ['b', 'a', 'A'] ['B', 'A', 'a']
The first column is the candidate letter, the second column is a winnow using a less-than function, and the third uses less-than-or-equal. Once both are filled up, in the 4th row, they meet a candidate equal to their worst. The lt winnow ignores it, while le absorbs it. Then again in row 6, le swaps two equal values. In row 7, both take in the B, as they should, but le propagates it down to the bottom of the heap, while lt leaves it at the top. Thus both comparisons work, but le does unnecessary work.
The target argument is of dubious benefit. All it does is save a bounds check in winnow.test. Without it, every inner cycle of the test method would need to check a boundary which is rarely exceeded. You would need this:
if child + 1 < self.length and \
self.op(self.heap[child], self.heap[child + 1]):
child += 1
instead of
if self.op(self.heap[child], self.heap[child + 1]):
child += 1
or, to be overly concise
child += self.op(self.heap[child], self.heap[child + 1])
abject is there to ease the creation of a fixed length heap in the absense of real values. It can also be used to set a threshold, outside of which values will be rejected out of hand. For instance
winnow(10, operator.gt, 7, 1e999)
will select the 10 highest numbers greater than 7. Which might be of use.