Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Running time per median calculation is proportional to the square-root of the window size.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
from collections import deque from bisect import insort def grouper(seq, n): 'Split sequence into n-length deques' return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//n*n, n)] class RunningMedian: ''' Updates median value over a sliding window. Initialize with a window size of data. The call update(elem) with new values. Returns the new median (for an odd-sized window). ''' # Gains speed by maintaining a sort while inserting new values # and deleting the oldest ones. # Reduction of insert/delete time is done by chunking sorted data # into subgroups organized as deques, so that appending and popping # are cheap. Running-time per update is proportional to the # square-root of the window size. # Inspired by "Efficient Algorithm for computing a Running Median" # by Soumya D. Mohanty. Adapted for Python by eliminating the # map structure, check structure, and by using deques. def __init__(self, iterable): self.q = q = deque(iterable) # track the order that items are added n = int(len(q) ** 0.5) # compute optimal chunk size self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences row, self.midoffset = divmod((len(q) - 1) // 2, n) # locate the mid-point (median) self.midchunk = self.chunks[row] def update(self, new_elem, check_invariants=False): # update the main queue that tracks values in the order added old_elem = self.q.popleft() self.q.append(new_elem) # insert new element into appropriate chunk (where new_elem less that rightmost elem in chunk) chunks = self.chunks for new_index, chunk in enumerate(chunks): if new_elem < chunk[-1]: break data = list(chunk) insort(data, new_elem) chunk.clear() chunk.extend(data) # remove oldest element for old_index, chunk in enumerate(chunks): if old_elem <= chunk[-1]: break chunk.remove(old_elem) # restore the chunk sizes by shifting end elements # chunk at new_index is too fat # chunk at old_index is too thin if new_index < old_index: # propagate the now-too-fat chunk to the right for i in range(new_index, old_index): chunks[i+1].appendleft(chunks[i].pop()) elif new_index > old_index: # feed the now-too-thin chunk from the right for i in range(old_index, new_index): chunks[i].append(chunks[i+1].popleft()) # check-invariants (slow) if check_invariants: assert sorted(self.q) == [elem for chunk in chunks for elem in chunk] assert len(set(map(len, chunks))) <= 2 # are chunk sizes balanced ? # return median return self.midchunk[self.midoffset] if __name__ == '__main__': from random import randrange from itertools import islice window_size = 41 data = [randrange(200) for i in range(1000)] it = iter(data) r = RunningMedian(islice(it, window_size)) medians = [r.update(elem, True) for elem in it] midpoint = (window_size - 1) // 2 median = lambda window: sorted(window)[midpoint] target_medians = [median(data[i:i+window_size]) for i in range(1, len(data)-window_size+1)] assert medians == target_medians print(medians)
Inspired by "Efficient Algorithm for computing a Running Median" by Soumya D. Mohanty. Adapted for Python by eliminating the map structure, check structure, and by using deques.
There are faster ways to compute a running median using skiplists or B+ trees, but this above algorithm is the best among published papers.