Welcome, guest | Sign In | My Account | Store | Cart

Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Running time per median calculation is proportional to the square-root of the window size.

Python, 94 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from collections import deque
from bisect import insort

def grouper(seq, n):
    'Split sequence into n-length deques'
    return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//n*n, n)]


class RunningMedian:
    ''' Updates median value over a sliding window.

    Initialize with a window size of data.  The call update(elem) with
    new values.  Returns the new median (for an odd-sized window).

    '''
    # Gains speed by maintaining a sort while inserting new values
    # and deleting the oldest ones.

    # Reduction of insert/delete time is done by chunking sorted data
    # into subgroups organized as deques, so that appending and popping
    # are cheap.  Running-time per update is proportional to the
    # square-root of the window size.

    # Inspired by "Efficient Algorithm for computing a Running Median"
    # by Soumya D. Mohanty.  Adapted for Python by eliminating the
    # map structure, check structure, and by using deques.

    def __init__(self, iterable):
        self.q = q = deque(iterable)                            # track the order that items are added
        n = int(len(q) ** 0.5)                                  # compute optimal chunk size
        self.chunks = grouper(sorted(q), n)                     # sorted list of sorted n-length subsequences
        row, self.midoffset = divmod((len(q) - 1) // 2, n)      # locate the mid-point (median)
        self.midchunk = self.chunks[row]

    def update(self, new_elem, check_invariants=False):
        # update the main queue that tracks values in the order added
        old_elem = self.q.popleft()
        self.q.append(new_elem)

        # insert new element into appropriate chunk (where new_elem less that rightmost elem in chunk)
        chunks = self.chunks
        for new_index, chunk in enumerate(chunks):
            if new_elem < chunk[-1]:
                break
        data = list(chunk)
        insort(data, new_elem)
        chunk.clear()
        chunk.extend(data)

        # remove oldest element
        for old_index, chunk in enumerate(chunks):
            if old_elem <= chunk[-1]:
                break
        chunk.remove(old_elem)

        # restore the chunk sizes by shifting end elements
        # chunk at new_index is too fat
        # chunk at old_index is too thin
        if new_index < old_index:
            # propagate the now-too-fat chunk to the right
            for i in range(new_index, old_index):
                chunks[i+1].appendleft(chunks[i].pop())
        elif new_index > old_index:
            # feed the now-too-thin chunk from the right
            for i in range(old_index, new_index):
                chunks[i].append(chunks[i+1].popleft())

        # check-invariants (slow)
        if check_invariants:
            assert sorted(self.q) == [elem for chunk in chunks for elem in chunk]
            assert len(set(map(len, chunks))) <= 2              # are chunk sizes balanced ?

        # return median
        return self.midchunk[self.midoffset]



if __name__ == '__main__':
    from random import randrange
    from itertools import islice

    window_size = 41
    data = [randrange(200) for i in range(1000)]

    it = iter(data)
    r = RunningMedian(islice(it, window_size))
    medians = [r.update(elem, True) for elem in it]

    midpoint = (window_size - 1) // 2
    median = lambda window: sorted(window)[midpoint]
    target_medians = [median(data[i:i+window_size]) for i in range(1, len(data)-window_size+1)]

    assert medians == target_medians
    print(medians)

Inspired by "Efficient Algorithm for computing a Running Median" by Soumya D. Mohanty. Adapted for Python by eliminating the map structure, check structure, and by using deques.

There are faster ways to compute a running median using skiplists or B+ trees, but this above algorithm is the best among published papers.

1 comment

Grant Jenks 8 years, 7 months ago  # | flag

Jan has a great comment on the old 576930 recipe benchmarking various options. For large window sizes (> ~20,000) the sortedcontainers SortedList implementation performs fastest. For smaller window sizes, simply using a plain list with bisect_left and insort functions from bisect is faster.

Created by Raymond Hettinger on Sat, 20 Feb 2010 (MIT)
Python recipes (4591)
Raymond Hettinger's recipes (97)

Required Modules

  • (none specified)

Other Information and Tasks