Welcome, guest | Sign In | My Account | Store | Cart

Maintains sorted data as new elements are added and old one removed as a sliding window advances over a stream of data. Also gives fast indexed access to value. Running time per median update is proportional to the log of the window size.

Python, 216 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from random import random
from math import log, ceil

class Node(object):
    __slots__ = 'value', 'next', 'width'
    def __init__(self, value, next, width):
        self.value, self.next, self.width = value, next, width

class End(object):
    'Sentinel object that always compares greater than another object'
    def __cmp__(self, other):
        return 1

NIL = Node(End(), [], [])               # Singleton terminator node

class IndexableSkiplist:
    'Sorted collection supporting O(lg n) insertion, removal, and lookup by rank.'

    def __init__(self, expected_size=100):
        self.size = 0
        self.maxlevels = int(1 + log(expected_size, 2))
        self.head = Node('HEAD', [NIL]*self.maxlevels, [1]*self.maxlevels)

    def __len__(self):
        return self.size

    def __getitem__(self, i):
        node = self.head
        i += 1
        for level in reversed(range(self.maxlevels)):
            while node.width[level] <= i:
                i -= node.width[level]
                node = node.next[level]
        return node.value

    def insert(self, value):
        # find first node on each level where node.next[levels].value > value
        chain = [None] * self.maxlevels
        steps_at_level = [0] * self.maxlevels
        node = self.head
        for level in reversed(range(self.maxlevels)):
            while node.next[level].value <= value:
                steps_at_level[level] += node.width[level]
                node = node.next[level]
            chain[level] = node

        # insert a link to the newnode at each level
        d = min(self.maxlevels, 1 - int(log(random(), 2.0)))
        newnode = Node(value, [None]*d, [None]*d)
        steps = 0
        for level in range(d):
            prevnode = chain[level]
            newnode.next[level] = prevnode.next[level]
            prevnode.next[level] = newnode
            newnode.width[level] = prevnode.width[level] - steps
            prevnode.width[level] = steps + 1
            steps += steps_at_level[level]
        for level in range(d, self.maxlevels):
            chain[level].width[level] += 1
        self.size += 1

    def remove(self, value):
        # find first node on each level where node.next[levels].value >= value
        chain = [None] * self.maxlevels
        node = self.head
        for level in reversed(range(self.maxlevels)):
            while node.next[level].value < value:
                node = node.next[level]
            chain[level] = node
        if value != chain[0].next[0].value:
            raise KeyError('Not Found')

        # remove one link at each level
        d = len(chain[0].next[0].next)
        for level in range(d):
            prevnode = chain[level]
            prevnode.width[level] += prevnode.next[level].width[level] - 1
            prevnode.next[level] = prevnode.next[level].next[level]
        for level in range(d, self.maxlevels):
            chain[level].width[level] -= 1
        self.size -= 1

    def __iter__(self):
        'Iterate over values in sorted order'
        node = self.head.next[0]
        while node is not NIL:
            yield node.value
            node = node.next[0]

from collections import deque
from itertools import islice

class RunningMedian:
    'Fast running median with O(lg n) updates where n is the window size'

    def __init__(self, n, iterable):
        self.it = iter(iterable)
        self.queue = deque(islice(self.it, n))
        self.skiplist = IndexableSkiplist(n)
        for elem in self.queue:
            self.skiplist.insert(elem)

    def __iter__(self):
        queue = self.queue
        skiplist = self.skiplist
        midpoint = len(queue) // 2
        yield skiplist[midpoint]
        for newelem in self.it:
            oldelem = queue.popleft()
            skiplist.remove(oldelem)
            queue.append(newelem)
            skiplist.insert(newelem)
            yield skiplist[midpoint]


if __name__ == '__main__':

    #########################################################################
    # Demonstrate the RunningMedian() class
    # Compare results to alternative class implemented using a regular list

    from bisect import insort
    from random import randrange

    class RunningMedianSlow:
        'Slow running-median with O(n) updates where n is the window size'

        def __init__(self, n, iterable):
            self.it = iter(iterable)
            self.queue = deque(islice(self.it, n))
            self.sortedlist = sorted(self.queue)

        def __iter__(self):
            queue = self.queue
            sortedlist = self.sortedlist
            midpoint = len(queue) // 2
            yield sortedlist[midpoint]
            for newelem in self.it:
                oldelem = queue.popleft()
                sortedlist.remove(oldelem)
                queue.append(newelem)
                insort(sortedlist, newelem)
                yield sortedlist[midpoint]

    M, N, window = 5000, 8000, 1001
    data = [randrange(M) for i in range(N)]
    result = list(RunningMedian(window, data))
    expected = list(RunningMedianSlow(window, data))
    assert result == expected
    print 'Successful test of RunningMedian() with', N,
    print 'items and a window of size', window, '\n'


    #########################################################################
    # Demonstrate and test the IndexableSkiplist() collection class

    from random import shuffle

    demo = list('FOURSQUARE')
    excluded = list('XZ')

    print 'Building an indexable skiplist containing %r:' % ''.join(demo)
    s = IndexableSkiplist(len(demo))                        # exercise __init__()
    shuffle(demo)
    for char in demo:
        s.insert(char)                                      # check insert()
        print 'Adding: ', char, list(s), len(s)

    for char in excluded:
        print 'Attempting to remove:', char, '-->',
        try:
            s.remove(char)                                  # check remove() item not in skiplist
        except KeyError:
            print 'Successfully not found'

    assert len(s) == len(demo)                              # check __len__()
    assert list(s) == sorted(demo)                          # check __iter__()
    assert [s[i] for i in range(len(s))] == sorted(demo)    # check __getitem__()

    shuffle(demo)
    for char in demo:
        s.remove(char)                                      # check remove() item in skiplist
        print 'Removing: ', char, list(s), len(s)
    assert list(s) == [] and len(s) == 0


    #############################################################################
    # Show the IndexableSkiplist's internal structure and describe its invariants

    def iter_levels(skiplist, level):
        'Iterate through nodes on a given level.'
        node = skiplist.head
        while node is not NIL:
            yield node
            node = node.next[level]

    demo = list('HOPSCOTCH')                    # create a sample skiplist
    s = IndexableSkiplist(len(demo))
    for char in demo:
        s.insert(char)

    print '\nStructure of an indexable skiplist containing %r:' % ''.join(demo)
    for i in reversed(range(s.maxlevels)):      # show the link structure on each level
        print 'Level %d:  ' % i,
        line = ''
        for n in iter_levels(s, i):
            line += '%s%d %s' % (n.value, n.width[i], '   ' * (n.width[i] - 1))
        print line + 'NIL'

    print '\nNote the following structure invariants:'
    print '* Level 0 has widths all equal to one.'
    print '* The sum of widths on each level == len(skiplist)+1.'
    print '* Entries on higher levels are subsets of the lower levels.'
    print '* Widths of entries on higer levels sum to the widths below.'
    print '    For example, a P --> T link of two steps on one level'
    print '    corresponds to P --> S --> T links of one step each on level 0.'

The story behind this recipe is recounted at: http://rhettinger.wordpress.com/2010/02/06/lost-knowledge/

Tested under Python 2.5 and 2.6

A skiplist, http://en.wikipedia.org/wiki/Skip_list, is a "data structure for storing a sorted list of items, using a hierarchy of linked lists that connect increasingly sparse subsequences of the items."

I've augmented the basic structure so that each of the links is accompanied by a width which tracks how many items are spanned by higher level links. The widths allow the the skiplist to be indexable using the __getitem__() method.

The three major operations all run at O(ln n) speed:

  • insert value
  • remove value
  • lookup value by rank

Listing or iterating the structure yields the entries in sorted order. The sort is stable.

This structure is perfect for solving problems like tracking a running median which requires sort order to be maintained as new items are added and old items are deleted and which requires fast access to the n-th item to find the median or quartiles.

The O(log n) update times beat other published solutions such as "Efficient Algorithm for Computing a Running Median" by Soymya D. Mohanty whose algorithm's update speed is O(sqrt(n)).

This recipe scales nicely and can handle million element sliding windows.

Here is sample output from the test code in the above recipe:

Successful test of RunningMedian() with 8000 items and a window of size 1001 

Building an indexable skiplist containing 'FOURSQUARE':
Adding:  A ['A'] 1
Adding:  O ['A', 'O'] 2
Adding:  R ['A', 'O', 'R'] 3
Adding:  S ['A', 'O', 'R', 'S'] 4
Adding:  U ['A', 'O', 'R', 'S', 'U'] 5
Adding:  U ['A', 'O', 'R', 'S', 'U', 'U'] 6
Adding:  E ['A', 'E', 'O', 'R', 'S', 'U', 'U'] 7
Adding:  Q ['A', 'E', 'O', 'Q', 'R', 'S', 'U', 'U'] 8
Adding:  F ['A', 'E', 'F', 'O', 'Q', 'R', 'S', 'U', 'U'] 9
Adding:  R ['A', 'E', 'F', 'O', 'Q', 'R', 'R', 'S', 'U', 'U'] 10
Attempting to remove: X --> Successfully not found
Attempting to remove: Z --> Successfully not found
Removing:  A ['E', 'F', 'O', 'Q', 'R', 'R', 'S', 'U', 'U'] 9
Removing:  Q ['E', 'F', 'O', 'R', 'R', 'S', 'U', 'U'] 8
Removing:  R ['E', 'F', 'O', 'R', 'S', 'U', 'U'] 7
Removing:  U ['E', 'F', 'O', 'R', 'S', 'U'] 6
Removing:  O ['E', 'F', 'R', 'S', 'U'] 5
Removing:  E ['F', 'R', 'S', 'U'] 4
Removing:  F ['R', 'S', 'U'] 3
Removing:  U ['R', 'S'] 2
Removing:  R ['S'] 1
Removing:  S [] 0

Structure of an indexable skiplist containing 'HOPSCOTCH':
Level 3:   HEAD1 C9                         NIL
Level 2:   HEAD1 C3       H6                NIL
Level 1:   HEAD1 C1 C1 H1 H1 O3       S2    NIL
Level 0:   HEAD1 C1 C1 H1 H1 O1 O1 P1 S1 T1 NIL

Note the following structure invariants:
* Level 0 has widths all equal to one.
* The sum of widths on each level is len(skiplist) + 1.
* Entries on higher levels are subsets of the lower levels.
* Widths of entries on higher levels sum to the widths below.
    For example, a P --> T link of two steps on one level
    corresponds to P --> S --> T links of one step each on level 0.

12 comments

Nicolas Barbey 14 years, 5 months ago  # | flag

Seems greate ! Right what I was looking for. I understand it only works in python 3 though. Is there a way to have it run under python 2.5 ? I would do it but I lack the competence to do so. Under 2.5, when I do a "python running_median.py" it says :

running_median.py:7: DeprecationWarning: integer argument expected, got float return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] Traceback (most recent call last): File "running_median.py", line 85, in <module> r = RunningMedian(islice(it, window_size)) File "running_median.py", line 32, in __init__ self.chunks = grouper(sorted(q), n) # sorted list of sorted n-length subsequences File "running_median.py", line 7, in grouper return [deque(seq[i:i+n]) for i in range(0, (len(seq)+n-1)//nn, n)] TypeError: slice indices must be integers or None or have an __index__ method

Looks like the __index__ method is not implemented but I don't know what to do from here.

Raymond Hettinger (author) 14 years, 5 months ago  # | flag

Updated so that it works under Python 2.5, 2.6, and 3.1. The change was to wrap the floor() call in int(). In Py3.1 floor() returns an integer; in Py2.5 and 2.6 it returns a float.

Matteo Dell'Amico 14 years, 5 months ago  # | flag

Faster version - O(log**2 n) update() using blist

Using the blist library (a list implemented with B+trees having O(log n) lookup, insertion and deletion), it is possible to implement this in a faster, and simpler, way. Searches with bisect_left and insort are O(log**2 n).

from collections import deque
from bisect import bisect_left, insort
from blist import blist

class RunningMedian:

    def __init__(self, iterable):
        self.q = q = deque(iterable)
        self.l = l = blist(q)
        l.sort()
        self.mididx = (len(q) - 1) // 2

    def update(self, new_elem, check_invariants=False):
        q, l = self.q, self.l
        old_elem = q.popleft()
        q.append(new_elem)
        del l[bisect_left(l, old_elem)]
        insort(l, new_elem)

        if check_invariants:
            assert l == sorted(q)

        return l[self.mididx]
Matteo Dell'Amico 14 years, 5 months ago  # | flag

With some quick benchmarking, the blist version is 2.25 times faster on my laptop using a window size of 10,000; this speedup grows to 3.90x with a 100,000 window size.

bearophile - 14 years, 1 month ago  # | flag

If the window width is fixed, you can use just a circular queue that is always full and just overwrites the oldest item, updating an index or pointer. It's simpler and probably faster than a full deque made of a linked list of short arrays as the Python one. If the input data is indexable you can just keep an index.

bearophile - 14 years, 1 month ago  # | flag

I meant something like this (not tested):

from bisect import bisect_left, insort
from blist import blist

class RunningMedian:
    def __init__(self, first_window):
        self.queue = list(first_window) # circular queue
        self.old_idx = 0 # index of the oldest item of the queue
        self.width = len(self.queue)
        self.midpoint = self.width // 2
        self.bl = blist(self.queue)
        self.bl.sort()

    def update(self, new_item):
        # swap old item with new one
        old_item = self.queue[self.old_idx]
        self.queue[self.old_idx] = new_item
        self.old_idx = (self.old_idx + 1) % self.width

        del self.bl[bisect_left(self.bl, old_item)]
        insort(self.bl, new_item)
        return self.bl[self.midpoint]

With a fixed window, Python's deque already acts as a circular queue. It is blazingly fast.

Besides, the queue is the fast O(1) component of the algorithm. Optimization efforts should be focused on the O(log n) skiplist insertion/deletion/lookup steps.

Alasdair Macmillan 10 years, 5 months ago  # | flag

Could this be modified to let you set the index value on insert like Redis' sorted sets?

Anurag Singh 9 years, 7 months ago  # | flag

Can someone please explain the algorithm ? don't know Python, so unable to understand.

Grant Jenks 9 years, 6 months ago  # | flag

Rather than using the blist module, try using sortedcontainers and I'll bet it's even faster. It implements sorted list, sorted dict, and sorted set data types in pure-Python and is fast-as-C implementations (even faster!). Learn more about sortedcontainers, available on PyPI and github.

Jan Schlüter 9 years ago  # | flag

Hey, maybe things have changed in the past five years, but I cannot see any improved speed from using blist.blist. And while sortedcontainers.sortedlist is indeed faster than blist.sortedlist, just using Matteo's bisect_left and insort code with a plain builtin list instead of the blist.blist gives the best performance for me, for window sizes between 100 and 10,000 (for smaller windows, scipy.ndimage.filters.median_filter is faster).

In particular, these are the timings I get (Python 2.7.6, numpy 1.8.2, scipy 0.13.3, i7-4770S CPU @ 3.10GHz):

Sequence length 100000, filter width 49:
  scipy.signal.medfilt: in 2.301e-01s;
  scipy.ndimage.filter.median_filter: in 6.000e-02s;
  using blist.blist: in 9.862e-02s;
  using list: in 8.571e-02s;
  using blist.sortedlist: in 5.502e-01s;
  using sortedcontainers.SortedList: in 2.862e-01s;
Sequence length 100000, filter width 99:
  scipy.signal.medfilt: in 5.077e-01s;
  scipy.ndimage.filter.median_filter: in 1.114e-01s;
  using blist.blist: in 1.057e-01s;
  using list: in 9.247e-02s;
  using blist.sortedlist: in 5.998e-01s;
  using sortedcontainers.SortedList: in 2.967e-01s;
Sequence length 100000, filter width 599:
  scipy.signal.medfilt: in 3.937e+00s;
  scipy.ndimage.filter.median_filter: in 6.031e-01s;
  using blist.blist: in 2.391e-01s;
  using list: in 1.173e-01s;
  using blist.sortedlist: in 8.984e-01s;
  using sortedcontainers.SortedList: in 3.200e-01s;
Sequence length 100000, filter width 9999:
  scipy.signal.medfilt: in 8.664e+01s;
  scipy.ndimage.filter.median_filter: in 1.030e+01s;
  using blist.blist: in 6.183e-01s;
  using list: in 3.834e-01s;
  using blist.sortedlist: in 1.595e+00s;
  using sortedcontainers.SortedList: in 5.080e-01s;

I've put the benchmark code in a github gist -- please let me know if you see anything wrong with it.

Probably the fastest solution would be a scipy.ndimage.filters.median_filter1d maintaining a sorted window in C.

Grant Jenks 8 years, 6 months ago  # | flag

I commented on Jan's github gist but wanted to add the results here.

For window sizes less than ~20,000 the best pure-Python solution is just to use list with bisect module functions. For larger window sizes, the sortedcontainers.SortedList data type performs fastest.