{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "name": "Readme.ipynb", "provenance": [], "collapsed_sections": [], "include_colab_link": true }, "kernelspec": { "name": "python3", "display_name": "Python 3" } }, "cells": [ { "cell_type": "markdown", "metadata": { "id": "view-in-github", "colab_type": "text" }, "source": [ "<a href=\"https://colab.research.google.com/github/mhdelta/distributed-gmm/blob/master/Readme.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" ] }, { "cell_type": "markdown", "metadata": { "id": "wlukD_tFxZWe", "colab_type": "text" }, "source": [ "# Distributed Gaussian Mixture Model\n", " \n", "This repository provides a distributed adaptation to the Tobias Schlagenhauf [code](https://www.python-course.eu/expectation_maximization_and_gaussian_mixture_models)\n", "for unsupervised clustering algorithm: [**Gaussian Mixture Models**](https://scikit-learn.org/stable/modules/mixture.html)\n", " \n", "This approach uses the zeroMQ push pull model \n", " \n", "![alt text](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/pushpull.png \"Logo Title Text 1\")\n", " \n", "However after leaving the result collector, process will start again with new parameters until model has converged.\n", "The basic idea goes like this: \n", "1. Random parameters are generated in the producer. $\\bf {\\mu, \\Sigma}$\n", "2. Producer sends pieces of the dataset $\\bf{X}$ and the parameters to the consumers.\n", "---\n", " **E-STEP**\n", "3. Consumers calculate the following ![alt text](https://latex.codecogs.com/gif.latex?r_%7Bic%7D%20%3D%20%5Cfrac%7B%5Cpi_c%20N%28%5Cboldsymbol%7Bx_i%7D%20%5C%20%7C%20%5C%20%5Cboldsymbol%7B%5Cmu_c%7D%2C%5Cboldsymbol%7B%5CSigma_c%7D%29%7D%7B%5CSigma_%7Bk%3D1%7D%5EK%20%5Cpi_k%20N%28%5Cboldsymbol%7Bx_i%20%5C%20%7C%20%5C%20%5Cboldsymbol%7B%5Cmu_k%7D%2C%5Cboldsymbol%7B%5CSigma_k%7D%7D%29%7D)\n", "which is a piece of the vector ric containing\n", "![alt text](https://latex.codecogs.com/gif.latex?%5Cfrac%7BProbability%20%5C%20that%20%5C%20x_i%20%5C%20belongs%20%5C%20to%20%5C%20class%20%5C%20c%7D%7BProbability%20%5C%20of%20%5C%20x_i%20%5C%20over%20%5C%20all%20%5C%20classes%7D)\n", " \n", "4. Consumers report the ric piece that they calculated to the result collector\n", "---\n", " \n", " **M-STEP**\n", "5. Result collector also known as sink, calculates the new parameters $\\bf {\\mu, \\Sigma}$ and sends them back to the producer so process can start again.\n", "---\n", " \n", "In escence what **producer/vent.py** should be doing is: \n", "```python\n", "X = initializeDataSet()\n", "vent.setRandomVariables()\n", "while(not converged):\n", " vent.receiveParams(vent.sinkSocket.recv_json())\n", " log_likelihoods.append(np.log(np.sum([k*multivariate_normal(vent.mu[i],vent.cov[j]).pdf(X) for k,i,j in zip(vent.pi,range(len(vent.mu)),range(len(vent.cov)))])))\n", " if len(log_likelihoods)> 1:\n", " if (log_likelihoods[-1] - log_likelihoods[-2]) < 0.000000000000000000009:\n", " converged = True\n", " vent.sendToMappers() # Sends a piece of X, mu and sigma\n", "vent.plotState()\n", "print(vent.predict(Xtest[0]))\n", "```\n", "---\n", "**consumer/worker.py**:\n", "```python\n", "while True:\n", " worker = Worker(\n", " msg['x'], \n", " msg['mu'], \n", " msg['cov'], \n", " )\n", " ric = worker.Estep()\n", " worker.sendToSink(ric)\n", "```\n", "---\n", "and last **result collector/sink.py**:\n", "```python\n", "while True:\n", " msg = workerSocket.recv_json()\n", " if samples_received == n_samples:\n", " sink = Sink(X,ric)\n", " sink.sendToVent(sink.Mstep())\n", " samples_received = 0\n", " else:\n", " samples_received += 1\n", " if samples_received == 1:\n", " X = msg['X']\n", " ric = msg['ric']\n", " else:\n", " X = np.concatenate((X, msg['X']))\n", " ric = np.concatenate((ric, msg['ric']))\n", "```\n", " \n", "This model is also complemented with a inner parallelism in each worker using threads, the idea is being able to compute and classify large datasets.\n", "Results are shown with blobs as in the main Tobias article (output does not change, just the process of getting the classified model). With the objective of profit from various computers and their processors. \n", "\n", "\n", "![ini state](https://github.com/mhdelta/distributed-gmm/blob/master/img/ini_state.png?raw=true)\n", "\n", "log likelihood trhought iterations\n", "![Log through iterations](https://github.com/mhdelta/distributed-gmm/blob/master/img/logl.png?raw=true)\n", "\n", "![ini state](https://github.com/mhdelta/distributed-gmm/blob/master/img/end_state.png?raw=true)\n", "\n", "\n", "\n", "\n", "\n", " \n", "\n", "\n" ] } ] }
mhdelta/distributed-gmm
Parallel adaptation for the gaussian mixture model code written by Tobias Schlagenhauf (www.python-course.eu/expectation_maximization_and_gaussian_mixture_models)
C++