Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Running time per median calculation is proportional to the square-root of the window size.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | from collections import deque
from bisect import insort
def grouper(seq, n):
'Split sequence into n-length deques'
return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//n*n, n)]
class RunningMedian:
''' Updates median value over a sliding window.
Initialize with a window size of data. The call update(elem) with
new values. Returns the new median (for an odd-sized window).
'''
# Gains speed by maintaining a sort while inserting new values
# and deleting the oldest ones.
# Reduction of insert/delete time is done by chunking sorted data
# into subgroups organized as deques, so that appending and popping
# are cheap. Running-time per update is proportional to the
# square-root of the window size.
# Inspired by "Efficient Algorithm for computing a Running Median"
# by Soumya D. Mohanty. Adapted for Python by eliminating the
# map structure, check structure, and by using deques.
def __init__(self, iterable):
self.q = q = deque(iterable) # track the order that items are added
n = int(len(q) ** 0.5) # compute optimal chunk size
self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences
row, self.midoffset = divmod((len(q) - 1) // 2, n) # locate the mid-point (median)
self.midchunk = self.chunks[row]
def update(self, new_elem, check_invariants=False):
# update the main queue that tracks values in the order added
old_elem = self.q.popleft()
self.q.append(new_elem)
# insert new element into appropriate chunk (where new_elem less that rightmost elem in chunk)
chunks = self.chunks
for new_index, chunk in enumerate(chunks):
if new_elem < chunk[-1]:
break
data = list(chunk)
insort(data, new_elem)
chunk.clear()
chunk.extend(data)
# remove oldest element
for old_index, chunk in enumerate(chunks):
if old_elem <= chunk[-1]:
break
chunk.remove(old_elem)
# restore the chunk sizes by shifting end elements
# chunk at new_index is too fat
# chunk at old_index is too thin
if new_index < old_index:
# propagate the now-too-fat chunk to the right
for i in range(new_index, old_index):
chunks[i+1].appendleft(chunks[i].pop())
elif new_index > old_index:
# feed the now-too-thin chunk from the right
for i in range(old_index, new_index):
chunks[i].append(chunks[i+1].popleft())
# check-invariants (slow)
if check_invariants:
assert sorted(self.q) == [elem for chunk in chunks for elem in chunk]
assert len(set(map(len, chunks))) <= 2 # are chunk sizes balanced ?
# return median
return self.midchunk[self.midoffset]
if __name__ == '__main__':
from random import randrange
from itertools import islice
window_size = 41
data = [randrange(200) for i in range(1000)]
it = iter(data)
r = RunningMedian(islice(it, window_size))
medians = [r.update(elem, True) for elem in it]
midpoint = (window_size - 1) // 2
median = lambda window: sorted(window)[midpoint]
target_medians = [median(data[i:i+window_size]) for i in range(1, len(data)-window_size+1)]
assert medians == target_medians
print(medians)
|
Inspired by "Efficient Algorithm for computing a Running Median" by Soumya D. Mohanty. Adapted for Python by eliminating the map structure, check structure, and by using deques.
There are faster ways to compute a running median using skiplists or B+ trees, but this above algorithm is the best among published papers.
Jan has a great comment on the old 576930 recipe benchmarking various options. For large window sizes (> ~20,000) the sortedcontainers
SortedList
implementation performs fastest. For smaller window sizes, simply using a plainlist
withbisect_left
andinsort
functions frombisect
is faster.