The following is a small contribution that I hope can be useful to Python programmers for the calculation of the running median, mean and mode. It is important to note that all the "running" calculations are done for full windows. Here is a simple example:
y = [1, 2, 3, 3, 1, 4], with a sliding window of size = 3 for the running estimations, means = 2, 2.6667, 2.3333, 2.6667 medians = 2, 3, 3, 3 modes = 1, 3, 3, 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
Purpose: The main purpose is to demonstrate how to find the running median, mode and
mean over a sequence (list) of integers or reals or a mix of integers and reals.
The secondary purpose, is to inspire Python programmers to explore some of
the powerful packages (e.g. collections) available to the Python community and
to learn more about list comprehension and lambda functions.
Note:
1. Much of the code here has been taken from code posted to the web (e.g. stackoverflow)
by other Python programmers (e.g. Peter Otten)
Author: V. Stokes (vs@it.uu.se)
Version: 2013.03.06
"""
import numpy as np
#*******************************************************
from collections import deque,Counter
from bisect import insort, bisect_left
from itertools import islice
def RunningMode(seq,N,M):
"""
Purpose: Find the mode for the points in a sliding window as it
is moved from left (beginning of seq) to right (end of seq)
by one point at a time.
Inputs:
seq -- list containing items for which a running mode (in a sliding window) is
to be calculated
N -- length of sequence
M -- number of items in window (window size) -- must be an integer > 1
Otputs:
modes -- list of modes with size M - N + 1
Note:
1. The mode is the value that appears most often in a set of data.
2. In the case of ties it the last of the ties that is taken as the mode (this
is not by definition).
"""
# Load deque with first window of seq
d = deque(seq[0:M])
modes = [Counter(d).most_common(1)[0][0]] # contains mode of first window
# Now slide the window by one point to the right for each new position (each pass through
# the loop). Stop when the item in the right end of the deque contains the last item in seq
for item in islice(seq,M,N):
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
modes.append(Counter(d).most_common(1)[0][0])
return modes
def RunningMedian(seq, M):
"""
Purpose: Find the median for the points in a sliding window (odd number in size)
as it is moved from left to right by one point at a time.
Inputs:
seq -- list containing items for which a running median (in a sliding window)
is to be calculated
M -- number of items in window (window size) -- must be an integer > 1
Otputs:
medians -- list of medians with size N - M + 1
Note:
1. The median of a finite list of numbers is the "center" value when this list
is sorted in ascending order.
2. If M is an even number the two elements in the window that
are close to the center are averaged to give the median (this
is not by definition)
"""
seq = iter(seq)
s = []
m = M // 2
# Set up list s (to be sorted) and load deque with first window of seq
s = [item for item in islice(seq,M)]
d = deque(s)
# Simple lambda function to handle even/odd window sizes
median = lambda : s[m] if bool(M&1) else (s[m-1]+s[m])*0.5
# Sort it in increasing order and extract the median ("center" of the sorted window)
s.sort()
medians = [median()]
# Now slide the window by one point to the right for each new position (each pass through
# the loop). Stop when the item in the right end of the deque contains the last item in seq
for item in seq:
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
del s[bisect_left(s, old)] # locate insertion point and then remove old
insort(s, item) # insert newest such that new sort is not required
medians.append(median())
return medians
def RunningMean(seq,N,M):
"""
Purpose: Find the mean for the points in a sliding window (fixed size)
as it is moved from left to right by one point at a time.
Inputs:
seq -- list containing items for which a mean (in a sliding window) is
to be calculated (N items)
N -- length of sequence
M -- number of items in sliding window
Otputs:
means -- list of means with size N - M + 1
"""
# Load deque (d) with first window of seq
d = deque(seq[0:M])
means = [np.mean(d)] # contains mean of first window
# Now slide the window by one point to the right for each new position (each pass through
# the loop). Stop when the item in the right end of the deque contains the last item in seq
for item in islice(seq,M,N):
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
means.append(np.mean(d)) # mean for current window
return means
#*** Start of test area *****************************************************
#
# Set random seed for repeatability
#np.random.seed(7919) # the 1000th prime number
# Try the following sequences
#yn = np.random.random(18)*10 - 5
yn = [3,2,-1.0,2.0,3.0,5.0,-5.0,6.0,-5.0,4.0,9.0,6.3,1.3,0.0,-7.0,1.3,-5.0]
#yn = [3,2,-1.0,-1.0,-1.0,5.0,5.0]
#yn = [5,5.0,5,5.0,5]
#yn = [3,3,3,3,3,3,3]
#yn = [3.,3.,3.,3.,3.,3.,3.]
#yn = [-1,1,-1,1,-1,1]
#yn = [3,2,-1.0,2.0]
#yn = [5,3,2,2,-1]
#yn = [-5.0,3.0,2.0,2.0,-1.0]
N = len(yn)
# Try the follwing window sizes
#M = 1
#M = 2
M = 3
#M = 4
#M = 5
#M = 6
if M <= N and M >= 1:
print 'M = %2d,'%M,
print 'yn:'
print yn
means = RunningMean(yn,N,M)
print ' Means(%d):' %(N-M+1)
print means
medians = RunningMedian(yn,M)
print ' Medians(%d):' %(N-M+1)
print medians
modes = RunningMode(yn,N,M)
print ' Modes(%d):' %(N-M+1)
print modes
else:
print 'Window size (M=%d) out of range'%M
|
These functions are not optimal in anyway; however, they are based on available Python packages and well commented (IMHO) for ease of understanding. This code might be a useful prototype for similar "running" estimations. Such methods are often useful for the analysis of large data sets. Note, that there are no checks on the validity of the input data --- I leave this to those who wish to use the code.
And of course, any feedback (positive or negative) is welcomed :-)