Welcome, guest | Sign In | My Account | Store | Cart
from collections import deque
from random import random
from math import log

class Infinity(object):
    'Sentinel object that always compares greater than another object'
    def __cmp__(self, other):
        return 1

def running_median(n, iterable, len=len, min=min, int=int, log=log, random=random):
    'Fast running median with O(lg n) updates where n is the window size'

    maxlevels = int(log(n, 2)) + 1
    bottom_to_top = list(range(maxlevels))
    top_to_bottom = bottom_to_top[::-1]

    VALUE, NEXT, WIDTH = 0, 1, 2                # Components of a node list
    NIL = [Infinity(), [], []]                  # Singleton terminator node
    head = ['HEAD', [NIL] * maxlevels, [1] * maxlevels]
    staircase = [None] * maxlevels

    queue = deque()
    queue_append, queue_popleft = queue.append, queue.popleft
    midpoint = n // 2
    oldnode = None
    for newelem in iterable:
        # staircase: first node on each level where node[NEXT][level][VALUE] > newelem
        queue_append(newelem)
        stair_width = [0] * maxlevels
        node = head
        for level in top_to_bottom:
            while not newelem < node[NEXT][level][VALUE]:
                stair_width[level] += node[WIDTH][level]
                node = node[NEXT][level]
            staircase[level] = node

        # make a new node or reuse one that was previously removed
        if oldnode is None:
            d = min(maxlevels, 1 - int(log(random(), 2.0)))
            newnode = [newelem, [None]*d, [None]*d]
        else:
            newnode = oldnode
            newnode[VALUE] = newelem
            d = len(newnode[NEXT])

        # insert a link to the newnode at each level
        steps = 0
        for level in bottom_to_top[:d]:
            prevnode = staircase[level]
            newnode[NEXT][level] = prevnode[NEXT][level]
            prevnode[NEXT][level] = newnode
            newnode[WIDTH][level] = prevnode[WIDTH][level] - steps
            prevnode[WIDTH][level] = steps
            steps += stair_width[level]
        for level in bottom_to_top:
            prevnode = staircase[level]
            prevnode[WIDTH][level] += 1
        
        if len(queue) >= n:
            # find and yield the midpoint value
            i = midpoint + 1
            node = head
            for level in top_to_bottom:
                while node[WIDTH][level] <= i:
                    i -= node[WIDTH][level]
                    node = node[NEXT][level]
            yield node[VALUE]

            # staircase: first node on each level where node[NEXT][level][VALUE] >= oldelem
            oldelem = queue_popleft()
            node = head
            for level in top_to_bottom:
                while node[NEXT][level][VALUE] < oldelem:
                    node = node[NEXT][level]
                staircase[level] = node
            oldnode = staircase[0][NEXT][0]     # node where oldnode[VALUE] is oldelem

            # remove links to the oldnode
            d = len(oldnode[NEXT])
            for level in bottom_to_top[:d]:
                prevnode = staircase[level]
                prevnode[WIDTH][level] += oldnode[WIDTH][level]
                prevnode[NEXT][level] = oldnode[NEXT][level]
            for level in bottom_to_top:
                prevnode = staircase[level]
                prevnode[WIDTH][level] -= 1


if __name__ == '__main__':

    ###########################################################################
    # Demonstrate the running_median() generator
    # Compare results to an alternative generator
    # implemented by sorting a regular list.

    from bisect import insort
    from random import randrange
    from itertools import islice

    def running_median_slow(n, iterable):
        'Slow running-median with O(n) updates where n is the window size'
        it = iter(iterable)
        queue = deque(islice(it, n))
        sortedlist = sorted(queue)
        midpoint = len(queue) // 2
        yield sortedlist[midpoint]
        for newelem in it:
            oldelem = queue.popleft()
            sortedlist.remove(oldelem)
            queue.append(newelem)
            insort(sortedlist, newelem)
            yield sortedlist[midpoint]

    M, N, window = 9000, 80000, 10001
    data = [randrange(M) for i in range(N)]
    result = list(running_median(window, data))
    expected = list(running_median_slow(window, data))
    assert result == expected
    print 'Successful test of RunningMedian() with', N,
    print 'items and a window of size', window, '\n'

History

  • revision 5 (13 years ago)
  • previous revisions are not available