Welcome, guest | Sign In | My Account | Store | Cart
```from collections import deque
from bisect import insort

def grouper(seq, n):
'Split sequence into n-length deques'
return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//n*n, n)]

class RunningMedian:
''' Updates median value over a sliding window.

Initialize with a window size of data.  The call update(elem) with
new values.  Returns the new median (for an odd-sized window).

'''
# Gains speed by maintaining a sort while inserting new values
# and deleting the oldest ones.

# Reduction of insert/delete time is done by chunking sorted data
# into subgroups organized as deques, so that appending and popping
# are cheap.  Running-time per update is proportional to the
# square-root of the window size.

# Inspired by "Efficient Algorithm for computing a Running Median"
# by Soumya D. Mohanty.  Adapted for Python by eliminating the
# map structure, check structure, and by using deques.

def __init__(self, iterable):
self.q = q = deque(iterable)                            # track the order that items are added
n = int(len(q) ** 0.5)                                  # compute optimal chunk size
self.chunks = grouper(sorted(q), n)                     # sorted list of sorted n-length subsequences
row, self.midoffset = divmod((len(q) - 1) // 2, n)      # locate the mid-point (median)
self.midchunk = self.chunks[row]

def update(self, new_elem, check_invariants=False):
# update the main queue that tracks values in the order added
old_elem = self.q.popleft()
self.q.append(new_elem)

# insert new element into appropriate chunk (where new_elem less that rightmost elem in chunk)
chunks = self.chunks
for new_index, chunk in enumerate(chunks):
if new_elem < chunk[-1]:
break
data = list(chunk)
insort(data, new_elem)
chunk.clear()
chunk.extend(data)

# remove oldest element
for old_index, chunk in enumerate(chunks):
if old_elem <= chunk[-1]:
break
chunk.remove(old_elem)

# restore the chunk sizes by shifting end elements
# chunk at new_index is too fat
# chunk at old_index is too thin
if new_index < old_index:
# propagate the now-too-fat chunk to the right
for i in range(new_index, old_index):
chunks[i+1].appendleft(chunks[i].pop())
elif new_index > old_index:
# feed the now-too-thin chunk from the right
for i in range(old_index, new_index):
chunks[i].append(chunks[i+1].popleft())

# check-invariants (slow)
if check_invariants:
assert sorted(self.q) == [elem for chunk in chunks for elem in chunk]
assert len(set(map(len, chunks))) <= 2              # are chunk sizes balanced ?

# return median
return self.midchunk[self.midoffset]

if __name__ == '__main__':
from random import randrange
from itertools import islice

window_size = 41
data = [randrange(200) for i in range(1000)]

it = iter(data)
r = RunningMedian(islice(it, window_size))
medians = [r.update(elem, True) for elem in it]

midpoint = (window_size - 1) // 2
median = lambda window: sorted(window)[midpoint]
target_medians = [median(data[i:i+window_size]) for i in range(1, len(data)-window_size+1)]

assert medians == target_medians
print(medians)
```