Welcome, guest | Sign In | My Account | Store | Cart

The following is a small contribution that I hope can be useful to Python programmers for the calculation of the running median, mean and mode. It is important to note that all the "running" calculations are done for full windows. Here is a simple example:

y = [1, 2, 3, 3, 1, 4], with a sliding window of size = 3 for the running estimations, means = 2, 2.6667, 2.3333, 2.6667 medians = 2, 3, 3, 3 modes = 1, 3, 3, 1

Python, 162 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
 Purpose: The main purpose is to demonstrate how to find the running median, mode and 
          mean over a sequence (list) of integers or reals or a mix of integers and reals.
          The secondary purpose, is to inspire Python programmers to explore some of
          the powerful packages (e.g. collections) available to the Python community and 
          to learn more about list comprehension and lambda functions.
    Note:        
       1. Much of the code here has been taken from code posted to the web (e.g. stackoverflow)
          by other Python programmers (e.g. Peter Otten)

  Author: V. Stokes (vs@it.uu.se)  
 Version: 2013.03.06

"""
import numpy as np

#*******************************************************

from collections import deque,Counter
from bisect import insort, bisect_left
from itertools import islice

def RunningMode(seq,N,M):
    """
    Purpose: Find the mode for the points in a sliding window as it 
             is moved from left (beginning of seq) to right (end of seq)
             by one point at a time.
     Inputs:
          seq -- list containing items for which a running mode (in a sliding window) is 
                 to be calculated
            N -- length of sequence                      
            M -- number of items in window (window size) -- must be an integer > 1
     Otputs:
        modes -- list of modes with size M - N + 1
       Note:
         1. The mode is the value that appears most often in a set of data.
         2. In the case of ties it the last of the ties that is taken as the mode (this
            is not by definition).
    """    
    # Load deque with first window of seq 
    d = deque(seq[0:M]) 

    modes = [Counter(d).most_common(1)[0][0]]  # contains mode of first window

    # Now slide the window by one point to the right for each new position (each pass through 
    # the loop). Stop when the item in the right end of the deque contains the last item in seq
    for item in islice(seq,M,N):
        old = d.popleft()                      # pop oldest from left
        d.append(item)                         # push newest in from right
        modes.append(Counter(d).most_common(1)[0][0])        
    return modes    

def RunningMedian(seq, M):
    """
     Purpose: Find the median for the points in a sliding window (odd number in size) 
              as it is moved from left to right by one point at a time.
      Inputs:
            seq -- list containing items for which a running median (in a sliding window) 
                   is to be calculated
              M -- number of items in window (window size) -- must be an integer > 1
      Otputs:
         medians -- list of medians with size N - M + 1
       Note:
         1. The median of a finite list of numbers is the "center" value when this list
            is sorted in ascending order. 
         2. If M is an even number the two elements in the window that
            are close to the center are averaged to give the median (this
            is not by definition)
    """   
    seq = iter(seq)
    s = []   
    m = M // 2

    # Set up list s (to be sorted) and load deque with first window of seq
    s = [item for item in islice(seq,M)]    
    d = deque(s)

    # Simple lambda function to handle even/odd window sizes    
    median = lambda : s[m] if bool(M&1) else (s[m-1]+s[m])*0.5

    # Sort it in increasing order and extract the median ("center" of the sorted window)
    s.sort()    
    medians = [median()]   

    # Now slide the window by one point to the right for each new position (each pass through 
    # the loop). Stop when the item in the right end of the deque contains the last item in seq
    for item in seq:
        old = d.popleft()          # pop oldest from left
        d.append(item)             # push newest in from right
        del s[bisect_left(s, old)] # locate insertion point and then remove old 
        insort(s, item)            # insert newest such that new sort is not required        
        medians.append(median())  
    return medians

def RunningMean(seq,N,M):
    """
     Purpose: Find the mean for the points in a sliding window (fixed size) 
              as it is moved from left to right by one point at a time.
      Inputs:
          seq -- list containing items for which a mean (in a sliding window) is 
                 to be calculated (N items)
            N -- length of sequence     
            M -- number of items in sliding window
      Otputs:
        means -- list of means with size N - M + 1    

    """    
    # Load deque (d) with first window of seq
    d = deque(seq[0:M])
    means = [np.mean(d)]             # contains mean of first window
    # Now slide the window by one point to the right for each new position (each pass through 
    # the loop). Stop when the item in the right end of the deque contains the last item in seq
    for item in islice(seq,M,N):
        old = d.popleft()            # pop oldest from left
        d.append(item)               # push newest in from right
        means.append(np.mean(d))     # mean for current window
    return means  

#*** Start of test area *****************************************************
#
# Set random seed for repeatability
#np.random.seed(7919) # the 1000th prime number

# Try the following sequences
#yn = np.random.random(18)*10 - 5
yn = [3,2,-1.0,2.0,3.0,5.0,-5.0,6.0,-5.0,4.0,9.0,6.3,1.3,0.0,-7.0,1.3,-5.0]
#yn = [3,2,-1.0,-1.0,-1.0,5.0,5.0]
#yn = [5,5.0,5,5.0,5]
#yn = [3,3,3,3,3,3,3]
#yn = [3.,3.,3.,3.,3.,3.,3.]
#yn = [-1,1,-1,1,-1,1]
#yn = [3,2,-1.0,2.0]
#yn = [5,3,2,2,-1]
#yn = [-5.0,3.0,2.0,2.0,-1.0]

N = len(yn)

# Try the follwing window sizes
#M = 1
#M = 2
M = 3
#M = 4
#M = 5
#M = 6
if M <= N and M >= 1:
    print 'M = %2d,'%M,
    print 'yn:'
    print yn

    means = RunningMean(yn,N,M)
    print ' Means(%d):' %(N-M+1)
    print means

    medians = RunningMedian(yn,M)
    print ' Medians(%d):' %(N-M+1)
    print medians

    modes = RunningMode(yn,N,M)
    print ' Modes(%d):' %(N-M+1)
    print modes
else:
    print 'Window size (M=%d) out of range'%M

These functions are not optimal in anyway; however, they are based on available Python packages and well commented (IMHO) for ease of understanding. This code might be a useful prototype for similar "running" estimations. Such methods are often useful for the analysis of large data sets. Note, that there are no checks on the validity of the input data --- I leave this to those who wish to use the code.

And of course, any feedback (positive or negative) is welcomed :-)