Gaussian mixture model

Sun 25 April 2021

In [16]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
plt.rcParams['figure.figsize'] = [12, 8]
sns.set()
sns.set_style('ticks')
sns.set_palette("Dark2")
np.set_printoptions(suppress=True)
from numpy import genfromtxt
import numpy.linalg as LA
from scipy.stats import multivariate_normal as multi_norm
from matplotlib.patches import Ellipse
In [17]:
faithful_df = genfromtxt('data/faithful.csv', delimiter=',')
X = faithful_df[1:,1:]
In [18]:
plt.scatter(X[:,0],X[:,1])
plt.ylim(top=100)
plt.xlim(right=6.0)
plt.ylabel('waiting')
plt.xlabel('eruptions')
plt.title('Scatter plot of the old faithful data')
plt.show()
In [19]:
class GMM:
    def __init__(self, X, K, max_iter=5):
        self.K = K
        self.max_iter = int(max_iter)
        self.X = X
        self.shape = self.X.shape
        self.n, self.m = self.shape
    
    def initialize(self):

        self.pis = np.full(shape=self.K, fill_value=1/self.K)
        self.qs = np.zeros(shape=self.m)

        rng = np.random.default_rng()
        self.mus = np.array([np.array([rng.choice(X[k]), rng.choice(X[k])]) for k in range(self.K)])
        self.sigmas = np.array([(rng.random((self.m,self.m)) + np.identity(self.m) * 50) for _ in range(self.K)] )
        
    def compute_responsibility(self):
        likelihood = np.zeros((self.n, self.K))
        for k in range(self.K):
            distribution = multi_norm(mean=self.mus[k], cov=self.sigmas[k], allow_singular=True )
            likelihood[:,k] = distribution.pdf(X)
        numerator = likelihood * self.pis
        denominator = numerator.sum(axis=1).reshape(-1,1)
        qs = numerator / denominator
        return qs
        
    def e_step(self):
        self.qs = self.compute_responsibility()

    def m_step(self):
        self.pis = np.mean(self.qs, axis=0)
        for k in range(self.K):
            q_k = self.qs[:,[k]]
            q_k_sum = q_k.sum()
            
            self.mus[k] = np.sum(X * q_k ,axis=0) / q_k_sum
            self.sigmas[k] = np.cov(X.T, aweights=(q_k/q_k_sum).flatten(), bias=True)
            #(X-self.mus[k]).T @ (X-self.mus[k]* (q_k/q_k_sum)) 
        #print(self.sigmas[1])

    def fit(self):
        self.initialize()

        for _ in range(self.max_iter):
            self.e_step()
            self.m_step()
        
    def predict(self):
            qs = self.compute_responsibility()
            return np.argmax(qs, axis=1)
In [28]:
gmm = GMM(X=X, K=2, max_iter=50)
gmm.fit()
In [29]:
def draw_ellipse(position, covariance, ax=None, **kwargs):
    """Draw an ellipse with a given position and covariance"""
    ax = ax or plt.gca()
    
    # Convert covariance to principal axes
    if covariance.shape == (2, 2):
        U, s, Vt = np.linalg.svd(covariance)
        angle = np.degrees(np.arctan2(U[1, 0], U[0, 0]))
        width, height = 2 * np.sqrt(s)
    else:
        angle = 0
        width, height = 2 * np.sqrt(covariance)
    
    # Draw the Ellipse
    for nsig in range(1, 4):
        ax.add_patch(Ellipse(position, nsig * width, nsig * height,
                             angle, **kwargs))
        
def plot_gmm(gmm, X, label=True, ax=None):
    ax = ax or plt.gca()
    labels = gmm.predict()
    if label:
        ax.scatter(X[:, 0], X[:, 1], c=labels, zorder=2, cmap='Dark2')
    else:
        ax.scatter(X[:, 0], X[:, 1], zorder=2)
    
    w_factor = 0.2 / gmm.pis.max()
    for pos, covar, w in zip(gmm.mus, gmm.sigmas, gmm.pis):
        draw_ellipse(pos, covar, alpha=w * w_factor)
    plt.ylabel('waiting')
    plt.xlabel('eruptions')
    plt.title('Plot which shows the em classification and the gaussian mixture model contours on the old faithful data')
    
In [30]:
plot_gmm(gmm,X)