Quick and simple implementation of Gaussian mixture model (with same covariance shapes) based expectation-maximization algorithm.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | import math, random, copy
import numpy as np
def expectation_maximization(t, nbclusters=2, nbiter=3, normalize=False,\
epsilon=0.001, monotony=False, datasetinit=True):
"""
Each row of t is an observation, each column is a feature
'nbclusters' is the number of seeds and so of clusters
'nbiter' is the number of iterations
'epsilon' is the convergence bound/criterium
Overview of the algorithm:
-> Draw nbclusters sets of (μ, σ, P_{#cluster}) at random (Gaussian
Mixture) [P(Cluster=0) = P_0 = (1/n).∑_{obs} P(Cluster=0|obs)]
-> Compute P(Cluster|obs) for each obs, this is:
[E] P(Cluster=0|obs)^t = P(obs|Cluster=0)*P(Cluster=0)^t
-> Recalculate the mixture parameters with the new estimate
[M] * P(Cluster=0)^{t+1} = (1/n).∑_{obs} P(Cluster=0|obs)
* μ^{t+1}_0 = ∑_{obs} obs.P(Cluster=0|obs) / P_0
* σ^{t+1}_0 = ∑_{obs} P(Cluster=0|obs)(obs-μ^{t+1}_0)^2 / P_0
-> Compute E_t=∑_{obs} log(P(obs)^t)
Repeat Steps 2 and 3 until |E_t - E_{t-1}| < ε
"""
def pnorm(x, m, s):
"""
Compute the multivariate normal distribution with values vector x,
mean vector m, sigma (variances/covariances) matrix s
"""
xmt = np.matrix(x-m).transpose()
for i in xrange(len(s)):
if s[i,i] <= sys.float_info[3]: # min float
s[i,i] = sys.float_info[3]
sinv = np.linalg.inv(s)
xm = np.matrix(x-m)
return (2.0*math.pi)**(-len(x)/2.0)*(1.0/math.sqrt(np.linalg.det(s)))\
*math.exp(-0.5*(xm*sinv*xmt))
def draw_params():
if datasetinit:
tmpmu = np.array([1.0*t[random.uniform(0,nbobs),:]],np.float64)
else:
tmpmu = np.array([random.uniform(min_max[f][0], min_max[f][1])\
for f in xrange(nbfeatures)], np.float64)
return {'mu': tmpmu,\
'sigma': np.matrix(np.diag(\
[(min_max[f][1]-min_max[f][0])/2.0\
for f in xrange(nbfeatures)])),\
'proba': 1.0/nbclusters}
nbobs = t.shape[0]
nbfeatures = t.shape[1]
min_max = []
# find xranges for each features
for f in xrange(nbfeatures):
min_max.append((t[:,f].min(), t[:,f].max()))
### Normalization
if normalize:
for f in xrange(nbfeatures):
t[:,f] -= min_max[f][0]
t[:,f] /= (min_max[f][1]-min_max[f][0])
min_max = []
for f in xrange(nbfeatures):
min_max.append((t[:,f].min(), t[:,f].max()))
### /Normalization
result = {}
quality = 0.0 # sum of the means of the distances to centroids
random.seed()
Pclust = np.ndarray([nbobs,nbclusters], np.float64) # P(clust|obs)
Px = np.ndarray([nbobs,nbclusters], np.float64) # P(obs|clust)
# iterate nbiter times searching for the best "quality" clustering
for iteration in xrange(nbiter):
##############################################
# Step 1: draw nbclusters sets of parameters #
##############################################
params = [draw_params() for c in xrange(nbclusters)]
old_log_estimate = sys.maxint # init, not true/real
log_estimate = sys.maxint/2 + epsilon # init, not true/real
estimation_round = 0
# Iterate until convergence (EM is monotone) <=> < epsilon variation
while (abs(log_estimate - old_log_estimate) > epsilon\
and (not monotony or log_estimate < old_log_estimate)):
restart = False
old_log_estimate = log_estimate
########################################################
# Step 2: compute P(Cluster|obs) for each observations #
########################################################
for o in xrange(nbobs):
for c in xrange(nbclusters):
# Px[o,c] = P(x|c)
Px[o,c] = pnorm(t[o,:],\
params[c]['mu'], params[c]['sigma'])
#for o in xrange(nbobs):
# Px[o,:] /= math.fsum(Px[o,:])
for o in xrange(nbobs):
for c in xrange(nbclusters):
# Pclust[o,c] = P(c|x)
Pclust[o,c] = Px[o,c]*params[c]['proba']
# assert math.fsum(Px[o,:]) >= 0.99 and\
# math.fsum(Px[o,:]) <= 1.01
for o in xrange(nbobs):
tmpSum = 0.0
for c in xrange(nbclusters):
tmpSum += params[c]['proba']*Px[o,c]
Pclust[o,:] /= tmpSum
#assert math.fsum(Pclust[:,c]) >= 0.99 and\
# math.fsum(Pclust[:,c]) <= 1.01
###########################################################
# Step 3: update the parameters (sets {mu, sigma, proba}) #
###########################################################
print "iter:", iteration, " estimation#:", estimation_round,\
" params:", params
for c in xrange(nbclusters):
tmpSum = math.fsum(Pclust[:,c])
params[c]['proba'] = tmpSum/nbobs
if params[c]['proba'] <= 1.0/nbobs: # restart if all
restart = True # converges to
print "Restarting, p:",params[c]['proba'] # one cluster
break
m = np.zeros(nbfeatures, np.float64)
for o in xrange(nbobs):
m += t[o,:]*Pclust[o,c]
params[c]['mu'] = m/tmpSum
s = np.matrix(np.diag(np.zeros(nbfeatures, np.float64)))
for o in xrange(nbobs):
s += Pclust[o,c]*(np.matrix(t[o,:]-params[c]['mu']).transpose()*\
np.matrix(t[o,:]-params[c]['mu']))
#print ">>>> ", t[o,:]-params[c]['mu']
#diag = Pclust[o,c]*((t[o,:]-params[c]['mu'])*\
# (t[o,:]-params[c]['mu']).transpose())
#print ">>> ", diag
#for i in xrange(len(s)) :
# s[i,i] += diag[i]
params[c]['sigma'] = s/tmpSum
print "------------------"
print params[c]['sigma']
### Test bound conditions and restart consequently if needed
if not restart:
restart = True
for c in xrange(1,nbclusters):
if not np.allclose(params[c]['mu'], params[c-1]['mu'])\
or not np.allclose(params[c]['sigma'], params[c-1]['sigma']):
restart = False
break
if restart: # restart if all converges to only
old_log_estimate = sys.maxint # init, not true/real
log_estimate = sys.maxint/2 + epsilon # init, not true/real
params = [draw_params() for c in xrange(nbclusters)]
continue
### /Test bound conditions and restart
####################################
# Step 4: compute the log estimate #
####################################
log_estimate = math.fsum([math.log(math.fsum(\
[Px[o,c]*params[c]['proba'] for c in xrange(nbclusters)]))\
for o in xrange(nbobs)])
print "(EM) old and new log estimate: ",\
old_log_estimate, log_estimate
estimation_round += 1
# Pick/save the best clustering as the final result
quality = -log_estimate
if not quality in result or quality > result['quality']:
result['quality'] = quality
result['params'] = copy.deepcopy(params)
result['clusters'] = [[o for o in xrange(nbobs)\
if Px[o,c] == max(Px[o,:])]\
for c in xrange(nbclusters)]
return result
|
Clusterize observation given their features following a Gaussian mixture model with same covariance matrices shape.
The excellent Information Theory, Inference and Learning Algorithm from David MacKay http://www.inference.phy.cam.ac.uk/mackay/itila/book.html (free PDF) and http://en.wikipedia.org/wiki/Expectation-maximization_algorithm
Hi, I am testing the above code snippet, and this is how I use it:
However, I got the following error:
numpy.linalg.linalg.LinAlgError: Singular matrix
Why is this happening? How should I fix this? The given matrix is a very simple one and the algorithm should be able to deal with it, right?
Same problem here, any solution?
You have few data points. Try to increase number of data points to be atleast 100 or so.