Welcome, guest | Sign In | My Account | Store | Cart

The winnow class uses a heap for finding the best few out of several items. At this it is quicker and shorter than python 2.3's heapq module, which is aimed at queuing rather than sifting. OTOH, it is unlikely to have any advantage over 2.4's heapq, which (I hear) has expanded functionality and is implemented in C.

Python, 132 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class winnow:
    """A heap structure allows you to easily keep track of one extreme of a
    set of values, at the expense of most other forms of order.  The
    theory is explained in many places, including the heapq documentation
    (http://www.python.org/doc/2.3.4/lib/node162.html).

    The winnow heap is implemented as a fixed length list with the worst
    item at position 0.  Candidates worse than item 0 are ignored; when
    one is better, item 0 is dropped and the worst remaining item is brought
    to the front.  This happens in winnow.test.

    Because the length of its heap is unchanging and known in advance, the
    winnow can use shortcuts unavailable to the queue.  You'll see that four
    arguments are needed for instance construction:

      def __init__(self, length, op, abject, target, seq=[])

    Length is an integer indicating how many items you want to collect.
    Op is a comparison function, taking two arguments and returning true if
    the first argument is better than the second.  To select the lowest, you
    would use probably use operator.lt.  Abject is a value beneath
    consideration, something worse than all the values you want to consider.
    Target is abject's mirror, an unachievably good value.  This sets up a
    precondition - you have to know just a little about your data.

    The heap is initialised thus:

      self.heap = [abject] * length + [target]

    which already meets the heap invariant, saving the trouble of
    heapification.  As data is thrown against the heap, it  will fill up with
    real values.

    Winnow.answer returns the heap with abject and target values stripped
    away.  As syntax sugar,  __call__ calls test and answer in turn.
    """

    def __init__(self, length, op, abject, target, seq=[]):
        """Arguments:
        length - the number of items wanted.
        op - a comparison operator. op(a, b) should return true if a is 
        better than b, and false if b is best. If a and b are equally good,
        then either true or false will work, with differing effects.
        abject - a value such that op(x, abject) will return false for any
        values of x that you want to catch.
        target - a value such that op(target, x) will return false for 
        any value of x.
        seq - optional iterable to initialise the heap.
        """
        self.abject = abject
        self.target = target
        self.op = op
        self.length = length
        self.heap = [abject] * length + [target]
        if seq:
            self.test(*seq)

    def test(self, *args):
        """This method examines each argument in turn.  If one is more
        suitable than the worst in the heap, it displaces it and triggers
        some shuffling in the heap to bring the new worst item to the fore.
        """
        for x in args:
            if self.op(x, self.heap[0]):
                pos = 0
                while True:
                    child = 2 * pos + 1
                    if child >= self.length:
                        break
                    child += self.op(self.heap[child], self.heap[child + 1])
                    if not self.op(x, self.heap[child]):
                        break
                    self.heap[pos] = self.heap[child]
                    pos = child
                self.heap[pos] = x

    def answer(self):
        """Returns a list containing the best results so far. If less
        than <self.length> values have been tested, the list will be
        of corresponding length."""
        return [ x for x in self.heap[:-1] if x != self.abject ]


    def __call__(self, *args):
        """Both the above methods combined in a convenient syrup.

        >>> from  operator import ge
        >>> top5 = winnow(5, ge, -1e+1000, 1e+1000)
        >>> top5(*[x*x for x in range(10)])
        [25, 36, 64, 49, 81]
        >>> top5(45, 22, 777)
        [45, 49, 64, 777, 81]
        """
        self.test(*args)
        return self.answer()


import operator
inf = 1e9999  

def lowest(length, seq=[], abject=inf, target=-inf):
    return winnow(length, operator.lt, abject, target, seq)

def highest(length, seq=[], abject=-inf, target=inf):
    return winnow(length, operator.gt, abject, target, seq)



if __name__ == '__main__':
    #simpleminded  tests
    r = range(50)
    r2 = r + r
    import random
    random.shuffle(r)
    random.shuffle(r2)

    low5 = lowest(5)
    print low5(*r)
    #should contain 0,1,2,3,4 in arbitrary order

    high12 = highest(12, r2)
    print high12()
    # should contain range(44,50) twice over, in arbitrary order.

    def cmp_3s(a, b):
        return str(a).count('3') > str(b).count('3')

    def most_3s(length, seq):
        return winnow(length, cmp_3s, '', '3'*50, seq)

    print most_3s(5, r2)()
    #should contain two 33s and three numbers with one 3

When there are many more candidates than available positions, the time taken usually approaches a linear relationship to the number of candidates, because most new candidates will be rejected at the first comparison. Put another way, it is O(n log k), where n is the number of candidates and k is the size of the elite. Performance is worst when each candidate is better than its predecessors -- the inner loop of winnow.test is exercised for every one.

The simplest way to achieve the same effect would be to sort and slice:

def simpleWinnow(k, p):
    p.sort()
    #p.reverse() #depending on which end you want.
    return p[:k]

This will be best for small, static populations, due to the speed of the builtin sort. But it won't scale well, nor elegantly cope with data that dribbles in slowly.

The operator function can return either true or false for equal values, with different but correct results in either case. The example below shows two case-insensitive winnows seeking the 3 lowest letters.

>>> le =  winnow(3, lambda a,b: a.lower() <= b.lower(), 'z', ' ')
>>> lt =  winnow(3, lambda a,b: a.lower() < b.lower(), 'z', ' ')
>>> for z in "abcCdcBA":
...     print '%2s %20s %20s' % (z, lt(z), le(z))
...
 a                ['a']                ['a']
 b           ['a', 'b']           ['b', 'a']
 c      ['c', 'a', 'b']      ['c', 'b', 'a']
 C      ['c', 'a', 'b']      ['C', 'b', 'a']
 d      ['c', 'a', 'b']      ['C', 'b', 'a']
 c      ['c', 'a', 'b']      ['c', 'b', 'a']
 B      ['B', 'a', 'b']      ['b', 'B', 'a']
 A      ['b', 'a', 'A']      ['B', 'A', 'a']

The first column is the candidate letter, the second column is a winnow using a less-than function, and the third uses less-than-or-equal. Once both are filled up, in the 4th row, they meet a candidate equal to their worst. The lt winnow ignores it, while le absorbs it. Then again in row 6, le swaps two equal values. In row 7, both take in the B, as they should, but le propagates it down to the bottom of the heap, while lt leaves it at the top. Thus both comparisons work, but le does unnecessary work.

The target argument is of dubious benefit. All it does is save a bounds check in winnow.test. Without it, every inner cycle of the test method would need to check a boundary which is rarely exceeded. You would need this:

if child + 1 < self.length and \
   self.op(self.heap[child], self.heap[child + 1]):
    child += 1

instead of

if self.op(self.heap[child], self.heap[child + 1]):
    child += 1

or, to be overly concise

child += self.op(self.heap[child], self.heap[child + 1])

abject is there to ease the creation of a fixed length heap in the absense of real values. It can also be used to set a threshold, outside of which values will be rejected out of hand. For instance

winnow(10, operator.gt, 7, 1e999)

will select the 10 highest numbers greater than 7. Which might be of use.