Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Also gives fast indexed access to value. Running time per median update is proportional to the log of the window size.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | from random import random
from math import log, ceil
class Node(object):
__slots__ = 'value', 'next', 'width'
def __init__(self, value, next, width):
self.value, self.next, self.width = value, next, width
class End(object):
'Sentinel object that always compares greater than another object'
def __cmp__(self, other):
return 1
NIL = Node(End(), [], []) # Singleton terminator node
class IndexableSkiplist:
'Sorted collection supporting O(lg n) insertion, removal, and lookup by rank.'
def __init__(self, expected_size=100):
self.size = 0
self.maxlevels = int(1 + log(expected_size, 2))
self.head = Node('HEAD', [NIL]*self.maxlevels, [1]*self.maxlevels)
def __len__(self):
return self.size
def __getitem__(self, i):
node = self.head
i += 1
for level in reversed(range(self.maxlevels)):
while node.width[level] <= i:
i -= node.width[level]
node = node.next[level]
return node.value
def insert(self, value):
# find first node on each level where node.next[levels].value > value
chain = [None] * self.maxlevels
steps_at_level = [0] * self.maxlevels
node = self.head
for level in reversed(range(self.maxlevels)):
while node.next[level].value <= value:
steps_at_level[level] += node.width[level]
node = node.next[level]
chain[level] = node
# insert a link to the newnode at each level
d = min(self.maxlevels, 1 - int(log(random(), 2.0)))
newnode = Node(value, [None]*d, [None]*d)
steps = 0
for level in range(d):
prevnode = chain[level]
newnode.next[level] = prevnode.next[level]
prevnode.next[level] = newnode
newnode.width[level] = prevnode.width[level] - steps
prevnode.width[level] = steps + 1
steps += steps_at_level[level]
for level in range(d, self.maxlevels):
chain[level].width[level] += 1
self.size += 1
def remove(self, value):
# find first node on each level where node.next[levels].value >= value
chain = [None] * self.maxlevels
node = self.head
for level in reversed(range(self.maxlevels)):
while node.next[level].value < value:
node = node.next[level]
chain[level] = node
if value != chain[0].next[0].value:
raise KeyError('Not Found')
# remove one link at each level
d = len(chain[0].next[0].next)
for level in range(d):
prevnode = chain[level]
prevnode.width[level] += prevnode.next[level].width[level] - 1
prevnode.next[level] = prevnode.next[level].next[level]
for level in range(d, self.maxlevels):
chain[level].width[level] -= 1
self.size -= 1
def __iter__(self):
'Iterate over values in sorted order'
node = self.head.next[0]
while node is not NIL:
yield node.value
node = node.next[0]
from collections import deque
from itertools import islice
class RunningMedian:
'Fast running median with O(lg n) updates where n is the window size'
def __init__(self, n, iterable):
self.it = iter(iterable)
self.queue = deque(islice(self.it, n))
self.skiplist = IndexableSkiplist(n)
for elem in self.queue:
self.skiplist.insert(elem)
def __iter__(self):
queue = self.queue
skiplist = self.skiplist
midpoint = len(queue) // 2
yield skiplist[midpoint]
for newelem in self.it:
oldelem = queue.popleft()
skiplist.remove(oldelem)
queue.append(newelem)
skiplist.insert(newelem)
yield skiplist[midpoint]
if __name__ == '__main__':
#########################################################################
# Demonstrate the RunningMedian() class
# Compare results to alternative class implemented using a regular list
from bisect import insort
from random import randrange
class RunningMedianSlow:
'Slow running-median with O(n) updates where n is the window size'
def __init__(self, n, iterable):
self.it = iter(iterable)
self.queue = deque(islice(self.it, n))
self.sortedlist = sorted(self.queue)
def __iter__(self):
queue = self.queue
sortedlist = self.sortedlist
midpoint = len(queue) // 2
yield sortedlist[midpoint]
for newelem in self.it:
oldelem = queue.popleft()
sortedlist.remove(oldelem)
queue.append(newelem)
insort(sortedlist, newelem)
yield sortedlist[midpoint]
M, N, window = 5000, 8000, 1001
data = [randrange(M) for i in range(N)]
result = list(RunningMedian(window, data))
expected = list(RunningMedianSlow(window, data))
assert result == expected
print 'Successful test of RunningMedian() with', N,
print 'items and a window of size', window, '\n'
#########################################################################
# Demonstrate and test the IndexableSkiplist() collection class
from random import shuffle
demo = list('FOURSQUARE')
excluded = list('XZ')
print 'Building an indexable skiplist containing %r:' % ''.join(demo)
s = IndexableSkiplist(len(demo)) # exercise __init__()
shuffle(demo)
for char in demo:
s.insert(char) # check insert()
print 'Adding: ', char, list(s), len(s)
for char in excluded:
print 'Attempting to remove:', char, '-->',
try:
s.remove(char) # check remove() item not in skiplist
except KeyError:
print 'Successfully not found'
assert len(s) == len(demo) # check __len__()
assert list(s) == sorted(demo) # check __iter__()
assert [s[i] for i in range(len(s))] == sorted(demo) # check __getitem__()
shuffle(demo)
for char in demo:
s.remove(char) # check remove() item in skiplist
print 'Removing: ', char, list(s), len(s)
assert list(s) == [] and len(s) == 0
#############################################################################
# Show the IndexableSkiplist's internal structure and describe its invariants
def iter_levels(skiplist, level):
'Iterate through nodes on a given level.'
node = skiplist.head
while node is not NIL:
yield node
node = node.next[level]
demo = list('HOPSCOTCH') # create a sample skiplist
s = IndexableSkiplist(len(demo))
for char in demo:
s.insert(char)
print '\nStructure of an indexable skiplist containing %r:' % ''.join(demo)
for i in reversed(range(s.maxlevels)): # show the link structure on each level
print 'Level %d: ' % i,
line = ''
for n in iter_levels(s, i):
line += '%s%d %s' % (n.value, n.width[i], ' ' * (n.width[i] - 1))
print line + 'NIL'
print '\nNote the following structure invariants:'
print '* Level 0 has widths all equal to one.'
print '* The sum of widths on each level == len(skiplist)+1.'
print '* Entries on higher levels are subsets of the lower levels.'
print '* Widths of entries on higer levels sum to the widths below.'
print ' For example, a P --> T link of two steps on one level'
print ' corresponds to P --> S --> T links of one step each on level 0.'
|
The story behind this recipe is recounted at: http://rhettinger.wordpress.com/2010/02/06/lost-knowledge/
Tested under Python 2.5 and 2.6
A skiplist, http://en.wikipedia.org/wiki/Skip_list, is a "data structure for storing a sorted list of items, using a hierarchy of linked lists that connect increasingly sparse subsequences of the items."
I've augmented the basic structure so that each of the links is accompanied by a width which tracks how many items are spanned by higher level links. The widths allow the the skiplist to be indexable using the __getitem__() method.
The three major operations all run at O(ln n) speed:
- insert value
- remove value
- lookup value by rank
Listing or iterating the structure yields the entries in sorted order. The sort is stable.
This structure is perfect for solving problems like tracking a running median which requires sort order to be maintained as new items are added and old items are deleted and which requires fast access to the n-th item to find the median or quartiles.
The O(log n) update times beat other published solutions such as "Efficient Algorithm for Computing a Running Median" by Soymya D. Mohanty whose algorithm's update speed is O(sqrt(n)).
This recipe scales nicely and can handle million element sliding windows.
Here is sample output from the test code in the above recipe:
Successful test of RunningMedian() with 8000 items and a window of size 1001
Building an indexable skiplist containing 'FOURSQUARE':
Adding: A ['A'] 1
Adding: O ['A', 'O'] 2
Adding: R ['A', 'O', 'R'] 3
Adding: S ['A', 'O', 'R', 'S'] 4
Adding: U ['A', 'O', 'R', 'S', 'U'] 5
Adding: U ['A', 'O', 'R', 'S', 'U', 'U'] 6
Adding: E ['A', 'E', 'O', 'R', 'S', 'U', 'U'] 7
Adding: Q ['A', 'E', 'O', 'Q', 'R', 'S', 'U', 'U'] 8
Adding: F ['A', 'E', 'F', 'O', 'Q', 'R', 'S', 'U', 'U'] 9
Adding: R ['A', 'E', 'F', 'O', 'Q', 'R', 'R', 'S', 'U', 'U'] 10
Attempting to remove: X --> Successfully not found
Attempting to remove: Z --> Successfully not found
Removing: A ['E', 'F', 'O', 'Q', 'R', 'R', 'S', 'U', 'U'] 9
Removing: Q ['E', 'F', 'O', 'R', 'R', 'S', 'U', 'U'] 8
Removing: R ['E', 'F', 'O', 'R', 'S', 'U', 'U'] 7
Removing: U ['E', 'F', 'O', 'R', 'S', 'U'] 6
Removing: O ['E', 'F', 'R', 'S', 'U'] 5
Removing: E ['F', 'R', 'S', 'U'] 4
Removing: F ['R', 'S', 'U'] 3
Removing: U ['R', 'S'] 2
Removing: R ['S'] 1
Removing: S [] 0
Structure of an indexable skiplist containing 'HOPSCOTCH':
Level 3: HEAD1 C9 NIL
Level 2: HEAD1 C3 H6 NIL
Level 1: HEAD1 C1 C1 H1 H1 O3 S2 NIL
Level 0: HEAD1 C1 C1 H1 H1 O1 O1 P1 S1 T1 NIL
Note the following structure invariants:
* Level 0 has widths all equal to one.
* The sum of widths on each level is len(skiplist) + 1.
* Entries on higher levels are subsets of the lower levels.
* Widths of entries on higher levels sum to the widths below.
For example, a P --> T link of two steps on one level
corresponds to P --> S --> T links of one step each on level 0.
Seems greate ! Right what I was looking for. I understand it only works in python 3 though. Is there a way to have it run under python 2.5 ? I would do it but I lack the competence to do so. Under 2.5, when I do a "python running_median.py" it says :
running_median.py:7: DeprecationWarning: integer argument expected, got float return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] Traceback (most recent call last): File "running_median.py", line 85, in <module> r = RunningMedian(islice(it, window_size)) File "running_median.py", line 32, in __init__ self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences File "running_median.py", line 7, in grouper return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] TypeError: slice indices must be integers or None or have an __index__ method
Looks like the __index__ method is not implemented but I don't know what to do from here.
Updated so that it works under Python 2.5, 2.6, and 3.1. The change was to wrap the floor() call in int(). In Py3.1 floor() returns an integer; in Py2.5 and 2.6 it returns a float.
Faster version - O(log**2 n) update() using blist
Using the blist library (a list implemented with B+trees having O(log n) lookup, insertion and deletion), it is possible to implement this in a faster, and simpler, way. Searches with bisect_left and insort are O(log**2 n).
With some quick benchmarking, the blist version is 2.25 times faster on my laptop using a window size of 10,000; this speedup grows to 3.90x with a 100,000 window size.
If the window width is fixed, you can use just a circular queue that is always full and just overwrites the oldest item, updating an index or pointer. It's simpler and probably faster than a full deque made of a linked list of short arrays as the Python one. If the input data is indexable you can just keep an index.
I meant something like this (not tested):
With a fixed window, Python's deque already acts as a circular queue. It is blazingly fast.
Besides, the queue is the fast O(1) component of the algorithm. Optimization efforts should be focused on the O(log n) skiplist insertion/deletion/lookup steps.
Could this be modified to let you set the index value on insert like Redis' sorted sets?
Can someone please explain the algorithm ? don't know Python, so unable to understand.
Rather than using the blist module, try using sortedcontainers and I'll bet it's even faster. It implements sorted list, sorted dict, and sorted set data types in pure-Python and is fast-as-C implementations (even faster!). Learn more about sortedcontainers, available on PyPI and github.
Hey, maybe things have changed in the past five years, but I cannot see any improved speed from using blist.blist. And while sortedcontainers.sortedlist is indeed faster than blist.sortedlist, just using Matteo's bisect_left and insort code with a plain builtin list instead of the blist.blist gives the best performance for me, for window sizes between 100 and 10,000 (for smaller windows, scipy.ndimage.filters.median_filter is faster).
In particular, these are the timings I get (Python 2.7.6, numpy 1.8.2, scipy 0.13.3, i7-4770S CPU @ 3.10GHz):
I've put the benchmark code in a github gist -- please let me know if you see anything wrong with it.
Probably the fastest solution would be a scipy.ndimage.filters.median_filter1d maintaining a sorted window in C.
I commented on Jan's github gist but wanted to add the results here.
For window sizes less than ~20,000 the best pure-Python solution is just to use
list
withbisect
module functions. For larger window sizes, thesortedcontainers.SortedList
data type performs fastest.