Fast version of r576930 reimplemented using a list of lists instead of a node class.

Python, 120 lines
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 from collections import deque from random import random from math import log class Infinity(object): 'Sentinel object that always compares greater than another object' def __cmp__(self, other): return 1 def running_median(n, iterable, len=len, min=min, int=int, log=log, random=random): 'Fast running median with O(lg n) updates where n is the window size' maxlevels = int(log(n, 2)) + 1 bottom_to_top = list(range(maxlevels)) top_to_bottom = bottom_to_top[::-1] VALUE, NEXT, WIDTH = 0, 1, 2 # Components of a node list NIL = [Infinity(), [], []] # Singleton terminator node head = ['HEAD', [NIL] * maxlevels, [1] * maxlevels] staircase = [None] * maxlevels queue = deque() queue_append, queue_popleft = queue.append, queue.popleft midpoint = n // 2 oldnode = None for newelem in iterable: # staircase: first node on each level where node[NEXT][level][VALUE] > newelem queue_append(newelem) stair_width = [0] * maxlevels node = head for level in top_to_bottom: while not newelem < node[NEXT][level][VALUE]: stair_width[level] += node[WIDTH][level] node = node[NEXT][level] staircase[level] = node # make a new node or reuse one that was previously removed if oldnode is None: d = min(maxlevels, 1 - int(log(random(), 2.0))) newnode = [newelem, [None]*d, [None]*d] else: newnode = oldnode newnode[VALUE] = newelem d = len(newnode[NEXT]) # insert a link to the newnode at each level steps = 0 for level in bottom_to_top[:d]: prevnode = staircase[level] newnode[NEXT][level] = prevnode[NEXT][level] prevnode[NEXT][level] = newnode newnode[WIDTH][level] = prevnode[WIDTH][level] - steps prevnode[WIDTH][level] = steps steps += stair_width[level] for level in bottom_to_top: prevnode = staircase[level] prevnode[WIDTH][level] += 1 if len(queue) >= n: # find and yield the midpoint value i = midpoint + 1 node = head for level in top_to_bottom: while node[WIDTH][level] <= i: i -= node[WIDTH][level] node = node[NEXT][level] yield node[VALUE] # staircase: first node on each level where node[NEXT][level][VALUE] >= oldelem oldelem = queue_popleft() node = head for level in top_to_bottom: while node[NEXT][level][VALUE] < oldelem: node = node[NEXT][level] staircase[level] = node oldnode = staircase[0][NEXT][0] # node where oldnode[VALUE] is oldelem # remove links to the oldnode d = len(oldnode[NEXT]) for level in bottom_to_top[:d]: prevnode = staircase[level] prevnode[WIDTH][level] += oldnode[WIDTH][level] prevnode[NEXT][level] = oldnode[NEXT][level] for level in bottom_to_top: prevnode = staircase[level] prevnode[WIDTH][level] -= 1 if __name__ == '__main__': ########################################################################### # Demonstrate the running_median() generator # Compare results to an alternative generator # implemented by sorting a regular list. from bisect import insort from random import randrange from itertools import islice def running_median_slow(n, iterable): 'Slow running-median with O(n) updates where n is the window size' it = iter(iterable) queue = deque(islice(it, n)) sortedlist = sorted(queue) midpoint = len(queue) // 2 yield sortedlist[midpoint] for newelem in it: oldelem = queue.popleft() sortedlist.remove(oldelem) queue.append(newelem) insort(sortedlist, newelem) yield sortedlist[midpoint] M, N, window = 9000, 80000, 10001 data = [randrange(M) for i in range(N)] result = list(running_median(window, data)) expected = list(running_median_slow(window, data)) assert result == expected print 'Successful test of RunningMedian() with', N, print 'items and a window of size', window, '\n'

The optimizations include:

• uses native Python lists instead of a custom node class

• folds all components into a single generator

• localizes global lookups (int=int, len=len, etc.)

• avoids attribute lookups, using bound methods instead

• re-uses nodes to avoid allocating and freeing lists

• precomputes the top_to_bottom and bottom_to_top lists instead of making calls to range()

To see the effect of the optimizations:

>>> from dis import dis
>>> dis(running_median)

List indexing has a fast path in CPython where BINARY_SUBSCR for lists and ints is executed in-line with a macro instead of the usual slow dispatch through list.__getitem__().

The combined effects of the optimizations give an almost ten-fold gain over the original recipe (when run with a large window sizes).

Virgil Stokes 9 years, 2 months ago

Suppose data has 20,000,000,000 float (64-bit) elements in it --- How could one perform a running median on such a large number of elements?

Grant Jenks 6 years, 3 months ago

Jan has a great comment on the old 576930 recipe benchmarking various options. For large window sizes (> ~20,000) the sortedcontainers SortedList implementation performs fastest. For smaller window sizes, simply using a plain list with bisect_left and insort functions from bisect is faster.

 Created by Raymond Hettinger on Sat, 27 Feb 2010 (MIT)

