ActiveState Code

Recipe 576930: Efficient Algorithm for computing a Running Median


Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Running time per median calculation is proportional to the square-root of the window size.

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from collections import deque
from bisect import insort

def grouper(seq, n):
    'Split sequence into n-length deques'
    return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//n*n, n)]


class RunningMedian:
    ''' Updates median value over a sliding window.

    Initialize with a window size of data.  The call update(elem) with
    new values.  Returns the new median (for an odd-sized window).

    '''
    # Gains speed by maintaining a sort while inserting new values
    # and deleting the oldest ones.

    # Reduction of insert/delete time is done by chunking sorted data
    # into subgroups organized as deques, so that appending and popping
    # are cheap.  Running-time per update is proportional to the
    # square-root of the window size.

    # Inspired by "Efficient Algorithm for computing a Running Median"
    # by Soumya D. Mohanty.  Adapted for Python by eliminating the
    # map structure, check structure, and by using deques.

    def __init__(self, iterable):
        self.q = q = deque(iterable)                            # track the order that items are added
        n = int(len(q) ** 0.5)                                  # compute optimal chunk size
        self.chunks = grouper(sorted(q), n)                     # sorted list of sorted n-length subsequences
        row, self.midoffset = divmod((len(q) - 1) // 2, n)      # locate the mid-point (median)
        self.midchunk = self.chunks[row]

    def update(self, new_elem, check_invariants=False):
        # update the main queue that tracks values in the order added
        old_elem = self.q.popleft()
        self.q.append(new_elem)

        # insert new element into appropriate chunk (where new_elem less that rightmost elem in chunk)
        chunks = self.chunks
        for new_index, chunk in enumerate(chunks):
            if new_elem < chunk[-1]:
                break
        data = list(chunk)
        insort(data, new_elem)
        chunk.clear()
        chunk.extend(data)

        # remove oldest element
        for old_index, chunk in enumerate(chunks):
            if old_elem <= chunk[-1]:
                break
        chunk.remove(old_elem)

        # restore the chunk sizes by shifting end elements
        # chunk at new_index is too fat
        # chunk at old_index is too thin
        if new_index < old_index:
            # propagate the now-too-fat chunk to the right
            for i in range(new_index, old_index):
                chunks[i+1].appendleft(chunks[i].pop())
        elif new_index > old_index:
            # feed the now-too-thin chunk from the right
            for i in range(old_index, new_index):
                chunks[i].append(chunks[i+1].popleft())

        # check-invariants (slow)
        if check_invariants:
            assert sorted(self.q) == [elem for chunk in chunks for elem in chunk]
            assert len(set(map(len, chunks))) <= 2              # are chunk sizes balanced ?

        # return median
        return self.midchunk[self.midoffset]



if __name__ == '__main__':
    from random import randrange
    from itertools import islice

    window_size = 41
    data = [randrange(200) for i in range(1000)]

    it = iter(data)
    r = RunningMedian(islice(it, window_size))
    medians = [r.update(elem, True) for elem in it]

    midpoint = (window_size - 1) // 2
    median = lambda window: sorted(window)[midpoint]
    target_medians = [median(data[i:i+window_size]) for i in range(1, len(data)-window_size+1)]

    assert medians == target_medians
    print(medians)

Discussion

Update time of O(sqrt(n)) beats using the QuickSelect algorithm (recipe 269554) which takes O(n) time for window size n.

Tested under Python 2.5, 2.6, and 3.1.

Comments

  1. 1. At 9:05 a.m. on 15 oct 2009, Nicolas Barbey said:

    Seems greate ! Right what I was looking for. I understand it only works in python 3 though. Is there a way to have it run under python 2.5 ? I would do it but I lack the competence to do so. Under 2.5, when I do a "python running_median.py" it says :

    running_median.py:7: DeprecationWarning: integer argument expected, got float return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] Traceback (most recent call last): File "running_median.py", line 85, in <module> r = RunningMedian(islice(it, window_size)) File "running_median.py", line 32, in __init__ self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences File "running_median.py", line 7, in grouper return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] TypeError: slice indices must be integers or None or have an __index__ method

    Looks like the __index__ method is not implemented but I don't know what to do from here.

  2. 2. At 12:10 p.m. on 15 oct 2009, Raymond Hettinger (the author) said:

    Updated so that it works under Python 2.5, 2.6, and 3.1. The change was to wrap the floor() call in int(). In Py3.1 floor() returns an integer; in Py2.5 and 2.6 it returns a float.

  3. 3. At 7:08 a.m. on 19 oct 2009, Matteo Dell'Amico said:

    Faster version - O(log**2 n) update() using blist

    Using the blist library (a list implemented with B+trees having O(log n) lookup, insertion and deletion), it is possible to implement this in a faster, and simpler, way. Searches with bisect_left and insort are O(log**2 n).

    from collections import deque
    from bisect import bisect_left, insort
    from blist import blist
    
    class RunningMedian:
    
        def __init__(self, iterable):
            self.q = q = deque(iterable)
            self.l = l = blist(q)
            l.sort()
            self.mididx = (len(q) - 1) // 2
    
        def update(self, new_elem, check_invariants=False):
            q, l = self.q, self.l
            old_elem = q.popleft()
            q.append(new_elem)
            del l[bisect_left(l, old_elem)]
            insort(l, new_elem)
    
            if check_invariants:
                assert l == sorted(q)
    
            return l[self.mididx]
    
  4. 4. At 7:13 a.m. on 19 oct 2009, Matteo Dell'Amico said:

    With some quick benchmarking, the blist version is 2.25 times faster on my laptop using a window size of 10,000; this speedup grows to 3.90x with a 100,000 window size.

Sign in to comment