Welcome, guest | Sign In | My Account | Store | Cart
```from collections import deque
from random import random
from math import log

class Infinity(object):
'Sentinel object that always compares greater than another object'
def __cmp__(self, other):
return 1

def running_median(n, iterable, len=len, min=min, int=int, log=log, random=random):
'Fast running median with O(lg n) updates where n is the window size'

maxlevels = int(log(n, 2)) + 1
bottom_to_top = list(range(maxlevels))
top_to_bottom = bottom_to_top[::-1]

VALUE, NEXT, WIDTH = 0, 1, 2                # Components of a node list
NIL = [Infinity(), [], []]                  # Singleton terminator node
staircase = [None] * maxlevels

queue = deque()
queue_append, queue_popleft = queue.append, queue.popleft
midpoint = n // 2
oldnode = None
for newelem in iterable:
# staircase: first node on each level where node[NEXT][level][VALUE] > newelem
queue_append(newelem)
stair_width = [0] * maxlevels
for level in top_to_bottom:
while not newelem < node[NEXT][level][VALUE]:
stair_width[level] += node[WIDTH][level]
node = node[NEXT][level]
staircase[level] = node

# make a new node or reuse one that was previously removed
if oldnode is None:
d = min(maxlevels, 1 - int(log(random(), 2.0)))
newnode = [newelem, [None]*d, [None]*d]
else:
newnode = oldnode
newnode[VALUE] = newelem
d = len(newnode[NEXT])

# insert a link to the newnode at each level
steps = 0
for level in bottom_to_top[:d]:
prevnode = staircase[level]
newnode[NEXT][level] = prevnode[NEXT][level]
prevnode[NEXT][level] = newnode
newnode[WIDTH][level] = prevnode[WIDTH][level] - steps
prevnode[WIDTH][level] = steps
steps += stair_width[level]
for level in bottom_to_top:
prevnode = staircase[level]
prevnode[WIDTH][level] += 1

if len(queue) >= n:
# find and yield the midpoint value
i = midpoint + 1
for level in top_to_bottom:
while node[WIDTH][level] <= i:
i -= node[WIDTH][level]
node = node[NEXT][level]
yield node[VALUE]

# staircase: first node on each level where node[NEXT][level][VALUE] >= oldelem
oldelem = queue_popleft()
for level in top_to_bottom:
while node[NEXT][level][VALUE] < oldelem:
node = node[NEXT][level]
staircase[level] = node
oldnode = staircase[0][NEXT][0]     # node where oldnode[VALUE] is oldelem

# remove links to the oldnode
d = len(oldnode[NEXT])
for level in bottom_to_top[:d]:
prevnode = staircase[level]
prevnode[WIDTH][level] += oldnode[WIDTH][level]
prevnode[NEXT][level] = oldnode[NEXT][level]
for level in bottom_to_top:
prevnode = staircase[level]
prevnode[WIDTH][level] -= 1

if __name__ == '__main__':

###########################################################################
# Demonstrate the running_median() generator
# Compare results to an alternative generator
# implemented by sorting a regular list.

from bisect import insort
from random import randrange
from itertools import islice

def running_median_slow(n, iterable):
'Slow running-median with O(n) updates where n is the window size'
it = iter(iterable)
queue = deque(islice(it, n))
sortedlist = sorted(queue)
midpoint = len(queue) // 2
yield sortedlist[midpoint]
for newelem in it:
oldelem = queue.popleft()
sortedlist.remove(oldelem)
queue.append(newelem)
insort(sortedlist, newelem)
yield sortedlist[midpoint]

M, N, window = 9000, 80000, 10001
data = [randrange(M) for i in range(N)]
result = list(running_median(window, data))
expected = list(running_median_slow(window, data))
assert result == expected
print 'Successful test of RunningMedian() with', N,
print 'items and a window of size', window, '\n'
```

### History

• revision 5 (14 years ago)
• previous revisions are not available