Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Running time per median calculation is proportional to the square-root of the window size.
| Python |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | from collections import deque
from bisect import insort
def grouper(seq, n):
'Split sequence into n-length deques'
return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//n*n, n)]
class RunningMedian:
''' Updates median value over a sliding window.
Initialize with a window size of data. The call update(elem) with
new values. Returns the new median (for an odd-sized window).
'''
# Gains speed by maintaining a sort while inserting new values
# and deleting the oldest ones.
# Reduction of insert/delete time is done by chunking sorted data
# into subgroups organized as deques, so that appending and popping
# are cheap. Running-time per update is proportional to the
# square-root of the window size.
# Inspired by "Efficient Algorithm for computing a Running Median"
# by Soumya D. Mohanty. Adapted for Python by eliminating the
# map structure, check structure, and by using deques.
def __init__(self, iterable):
self.q = q = deque(iterable) # track the order that items are added
n = int(len(q) ** 0.5) # compute optimal chunk size
self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences
row, self.midoffset = divmod((len(q) - 1) // 2, n) # locate the mid-point (median)
self.midchunk = self.chunks[row]
def update(self, new_elem, check_invariants=False):
# update the main queue that tracks values in the order added
old_elem = self.q.popleft()
self.q.append(new_elem)
# insert new element into appropriate chunk (where new_elem less that rightmost elem in chunk)
chunks = self.chunks
for new_index, chunk in enumerate(chunks):
if new_elem < chunk[-1]:
break
data = list(chunk)
insort(data, new_elem)
chunk.clear()
chunk.extend(data)
# remove oldest element
for old_index, chunk in enumerate(chunks):
if old_elem <= chunk[-1]:
break
chunk.remove(old_elem)
# restore the chunk sizes by shifting end elements
# chunk at new_index is too fat
# chunk at old_index is too thin
if new_index < old_index:
# propagate the now-too-fat chunk to the right
for i in range(new_index, old_index):
chunks[i+1].appendleft(chunks[i].pop())
elif new_index > old_index:
# feed the now-too-thin chunk from the right
for i in range(old_index, new_index):
chunks[i].append(chunks[i+1].popleft())
# check-invariants (slow)
if check_invariants:
assert sorted(self.q) == [elem for chunk in chunks for elem in chunk]
assert len(set(map(len, chunks))) <= 2 # are chunk sizes balanced ?
# return median
return self.midchunk[self.midoffset]
if __name__ == '__main__':
from random import randrange
from itertools import islice
window_size = 41
data = [randrange(200) for i in range(1000)]
it = iter(data)
r = RunningMedian(islice(it, window_size))
medians = [r.update(elem, True) for elem in it]
midpoint = (window_size - 1) // 2
median = lambda window: sorted(window)[midpoint]
target_medians = [median(data[i:i+window_size]) for i in range(1, len(data)-window_size+1)]
assert medians == target_medians
print(medians)
|
Discussion
Update time of O(sqrt(n)) beats using the QuickSelect algorithm (recipe 269554) which takes O(n) time for window size n.
Tested under Python 2.5, 2.6, and 3.1.


Comments
Seems greate ! Right what I was looking for. I understand it only works in python 3 though. Is there a way to have it run under python 2.5 ? I would do it but I lack the competence to do so. Under 2.5, when I do a "python running_median.py" it says :
running_median.py:7: DeprecationWarning: integer argument expected, got float return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] Traceback (most recent call last): File "running_median.py", line 85, in <module> r = RunningMedian(islice(it, window_size)) File "running_median.py", line 32, in __init__ self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences File "running_median.py", line 7, in grouper return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] TypeError: slice indices must be integers or None or have an __index__ method
Looks like the __index__ method is not implemented but I don't know what to do from here.
Updated so that it works under Python 2.5, 2.6, and 3.1. The change was to wrap the floor() call in int(). In Py3.1 floor() returns an integer; in Py2.5 and 2.6 it returns a float.
Faster version - O(log**2 n) update() using blist
Using the blist library (a list implemented with B+trees having O(log n) lookup, insertion and deletion), it is possible to implement this in a faster, and simpler, way. Searches with bisect_left and insort are O(log**2 n).
With some quick benchmarking, the blist version is 2.25 times faster on my laptop using a window size of 10,000; this speedup grows to 3.90x with a 100,000 window size.
Sign in to comment