Fast version of r576930 reimplemented using a list of lists instead of a node class.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | from collections import deque
from random import random
from math import log
class Infinity(object):
'Sentinel object that always compares greater than another object'
def __cmp__(self, other):
return 1
def running_median(n, iterable, len=len, min=min, int=int, log=log, random=random):
'Fast running median with O(lg n) updates where n is the window size'
maxlevels = int(log(n, 2)) + 1
bottom_to_top = list(range(maxlevels))
top_to_bottom = bottom_to_top[::-1]
VALUE, NEXT, WIDTH = 0, 1, 2 # Components of a node list
NIL = [Infinity(), [], []] # Singleton terminator node
head = ['HEAD', [NIL] * maxlevels, [1] * maxlevels]
staircase = [None] * maxlevels
queue = deque()
queue_append, queue_popleft = queue.append, queue.popleft
midpoint = n // 2
oldnode = None
for newelem in iterable:
# staircase: first node on each level where node[NEXT][level][VALUE] > newelem
queue_append(newelem)
stair_width = [0] * maxlevels
node = head
for level in top_to_bottom:
while not newelem < node[NEXT][level][VALUE]:
stair_width[level] += node[WIDTH][level]
node = node[NEXT][level]
staircase[level] = node
# make a new node or reuse one that was previously removed
if oldnode is None:
d = min(maxlevels, 1 - int(log(random(), 2.0)))
newnode = [newelem, [None]*d, [None]*d]
else:
newnode = oldnode
newnode[VALUE] = newelem
d = len(newnode[NEXT])
# insert a link to the newnode at each level
steps = 0
for level in bottom_to_top[:d]:
prevnode = staircase[level]
newnode[NEXT][level] = prevnode[NEXT][level]
prevnode[NEXT][level] = newnode
newnode[WIDTH][level] = prevnode[WIDTH][level] - steps
prevnode[WIDTH][level] = steps
steps += stair_width[level]
for level in bottom_to_top:
prevnode = staircase[level]
prevnode[WIDTH][level] += 1
if len(queue) >= n:
# find and yield the midpoint value
i = midpoint + 1
node = head
for level in top_to_bottom:
while node[WIDTH][level] <= i:
i -= node[WIDTH][level]
node = node[NEXT][level]
yield node[VALUE]
# staircase: first node on each level where node[NEXT][level][VALUE] >= oldelem
oldelem = queue_popleft()
node = head
for level in top_to_bottom:
while node[NEXT][level][VALUE] < oldelem:
node = node[NEXT][level]
staircase[level] = node
oldnode = staircase[0][NEXT][0] # node where oldnode[VALUE] is oldelem
# remove links to the oldnode
d = len(oldnode[NEXT])
for level in bottom_to_top[:d]:
prevnode = staircase[level]
prevnode[WIDTH][level] += oldnode[WIDTH][level]
prevnode[NEXT][level] = oldnode[NEXT][level]
for level in bottom_to_top:
prevnode = staircase[level]
prevnode[WIDTH][level] -= 1
if __name__ == '__main__':
###########################################################################
# Demonstrate the running_median() generator
# Compare results to an alternative generator
# implemented by sorting a regular list.
from bisect import insort
from random import randrange
from itertools import islice
def running_median_slow(n, iterable):
'Slow running-median with O(n) updates where n is the window size'
it = iter(iterable)
queue = deque(islice(it, n))
sortedlist = sorted(queue)
midpoint = len(queue) // 2
yield sortedlist[midpoint]
for newelem in it:
oldelem = queue.popleft()
sortedlist.remove(oldelem)
queue.append(newelem)
insort(sortedlist, newelem)
yield sortedlist[midpoint]
M, N, window = 9000, 80000, 10001
data = [randrange(M) for i in range(N)]
result = list(running_median(window, data))
expected = list(running_median_slow(window, data))
assert result == expected
print 'Successful test of RunningMedian() with', N,
print 'items and a window of size', window, '\n'
|
The optimizations include:
uses native Python lists instead of a custom node class
folds all components into a single generator
localizes global lookups (int=int, len=len, etc.)
avoids attribute lookups, using bound methods instead
re-uses nodes to avoid allocating and freeing lists
precomputes the top_to_bottom and bottom_to_top lists instead of making calls to range()
To see the effect of the optimizations:
>>> from dis import dis
>>> dis(running_median)
Note that the presence of LOAD_CONST and LOAD_FAST instead of LOAD_GLOBAL or LOAD_ATTR.
List indexing has a fast path in CPython where BINARY_SUBSCR for lists and ints is executed in-line with a macro instead of the usual slow dispatch through list.__getitem__().
The combined effects of the optimizations give an almost ten-fold gain over the original recipe (when run with a large window sizes).
Suppose data has 20,000,000,000 float (64-bit) elements in it --- How could one perform a running median on such a large number of elements?
Jan has a great comment on the old 576930 recipe benchmarking various options. For large window sizes (> ~20,000) the sortedcontainers
SortedList
implementation performs fastest. For smaller window sizes, simply using a plainlist
withbisect_left
andinsort
functions frombisect
is faster.