Skip to content

Instantly share code, notes, and snippets.

@simeoncarstens
Last active January 27, 2020 14:21
Show Gist options
  • Save simeoncarstens/3d3cc9bb340edeecf21bb58f419da860 to your computer and use it in GitHub Desktop.
Save simeoncarstens/3d3cc9bb340edeecf21bb58f419da860 to your computer and use it in GitHub Desktop.
Importance Sampling, Sequential Monte Carlo, Particle Filtering and stuff
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Importance Sampling, Sequential Importance Sampling and Particle Filters\n",
"Matthias, Yves, Leo and I, we recently discussed above sampling algorithms in the context of `monad-bayes`. To get a better understanding, I decided to implement them in Python."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Importance sampling\n",
"... is a very general sampling method. For once, it has nothing to do with Markov chains and not even with Monte Carlo. \n",
"Let's say you want to calculate the expectation of a function $f(x)$ w.r.t. a probability distribution $p(x)$. It is given by the following integral:\n",
"\\begin{equation}\n",
" \\langle f \\rangle_{p} = \\int \\mathrm d x \\ f(x) p(x)\n",
"\\end{equation}\n",
"Now in general, you might not be able to calculate this analytically, so you resort to numerics. If you could sample correctly from $p(x)$, you could draw $N$ samples $x_1, \\ldots, x_N$ and approximate the integral by a sum:\n",
"\\begin{equation}\n",
" \\langle f \\rangle_{p} \\approx \\frac{1}{N} \\sum_{i=1}^N f(x_i) p(x_i)\n",
"\\end{equation}\n",
"The world unfortunately sometimes is kind of a bad place and sampling from $p(x)$ might not be possible. But you might be able to sample from a different distribution $q(x)$. Then, doing what physicists have been doing all their lifes, you insert a 1 and write:\n",
"\\begin{align}\n",
" \\langle f \\rangle_{p} &= \\int \\mathrm d x \\ f(x) \\frac{p(x)}{q(x)}q(x) \\\\\n",
" &= \\int \\mathrm d x \\ f(x) w(x) q(x) \\\\\n",
" &= \\langle f\\cdot w \\rangle_q\n",
"\\end{align}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here, we introduced the *importance weight* $w(x)=\\frac{p(x)}{q(x)}$ and we see that we're now calculating the expectation value of a different functon w.r.t. a different distribution. That expectation value can now be again approximated, supposed we draw $N$ samples $x_1, \\ldots, x_N$ from $q$ (confusing, right?):\n",
"\\begin{equation}\n",
" \\langle f \\rangle_p = \\langle f\\cdot w \\rangle_q \\approx \\frac{1}{N}\\sum_{i=1}^N f(x_i) w(x_i)\n",
"\\end{equation}\n",
"TODO: I'm not clear why we have to normalize the importants weights (which I haven't done here, but later in the code I do)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Voili, voilà—this is called *importance sampling*, because instead from $p$, we sample from an *importance distribution* $q$. Now you see a problem here: if the distributions $p$ and $q$ don't match up well (and in general, they won't), you will draw many samples from $q$ which have low importance weights $w(x_i)$ and thus contribute only little to the approximation, meaning you will need a lot of samples and still might get a bad estimate."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Implementing importance sampling\n",
"Let's first import useful stuff and define a few PDFs:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from scipy.special import logsumexp\n",
"choice = np.random.choice\n",
"\n",
"\n",
"class Pdf(object):\n",
"\n",
" def __call__(self, x):\n",
" \"\"\"log-probability\"\"\"\n",
" pass\n",
"\n",
" def sample(self, n):\n",
" pass\n",
"\n",
"\n",
"\n",
"class Gaussian(Pdf):\n",
"\n",
" def __init__(self, mu=0, sigma=1):\n",
"\n",
" self.mu = mu\n",
" self.sigma = sigma\n",
"\n",
" def __call__(self, x):\n",
"\n",
" return -0.5 * (x - self.mu) ** 2 / self.sigma / self.sigma\n",
"\n",
" def sample(self, n):\n",
"\n",
" return np.random.normal(self.mu, self.sigma, n)\n",
"\n",
"\n",
"class Uniform(Pdf):\n",
"\n",
" def __init__(self, low, high):\n",
"\n",
" self.low = low\n",
" self.high = high\n",
"\n",
" def __call__(self, x):\n",
"\n",
" return np.repeat(-np.log(self.high - self.low), len(x))\n",
"\n",
" def sample(self, n):\n",
"\n",
" return np.random.uniform(self.low, self.high, n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we whip up a class implementing Importance Sampling. Its `sample` method will return the samples from the importance distribution and the corresponding importance weights. Also, as you might have noticed, we work in log-space for numeric stability."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"class ImportanceSampler(object):\n",
"\n",
" def __init__(self, posterior):\n",
"\n",
" self.posterior = posterior\n",
"\n",
" def sample(self, imp_dist, n_samples):\n",
"\n",
" imp_samples = imp_dist.sample(n_samples)\n",
" imp_weights = self.calculate_weights(imp_dist, imp_samples)\n",
" # doing stuff in log-space to avoid underflows\n",
" normalized_imp_weights = imp_weights - logsumexp(imp_weights)\n",
"\n",
" return imp_samples, normalized_imp_weights\n",
"\n",
" def calculate_weights(self, imp_dist, imp_samples):\n",
"\n",
" return self.posterior(imp_samples) - imp_dist(imp_samples)\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's try this out. We want to sample from a Gaussian distribution, but we pretend we cannot, so we sample from a uniform importance distribution instead:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"n_samples = 200000\n",
"target = Gaussian()\n",
"imp = Uniform(-10, 10)\n",
"sampler = ImportanceSampler(target)\n",
"biased_samples, logws = sampler.sample(imp, n_samples)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now I've been talking a lot about approximating expectation values, but for personal reasons, I'm actually more interested in the actual samples. So how do you get samples from the target distribution from the importance samples and their weights? Well, you just draw from the set of importance samples, but with probabilities given by the importance weights:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"samples = choice(biased_samples, n_samples, p=np.exp(logws))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Gesagt, getan, so now we can plot a histogram of the actual (unbiased) samples:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "iVBORw0KGgoAAAANSUhEUgAAAYMAAAD4CAYAAAAO9oqkAAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAALEgAACxIB0t1+/AAAADh0RVh0U29mdHdhcmUAbWF0cGxvdGxpYiB2ZXJzaW9uMy4xLjEsIGh0dHA6Ly9tYXRwbG90bGliLm9yZy8QZhcZAAAPu0lEQVR4nO3df6zddX3H8edrrTgzNVTbdaRtVqL9p7qt6k1pwpIxWUoBs2JiDCyTzhFrYkkgYZlV/6hRSWoWdSNTlqqNJWNWIhoaqasdITH+AfaCFSjIuMEy2hR6tSguJpq69/44nzvO6u29p/fX97b3+UhOzve8v5/v97zPN9DX/f46J1WFJGlh+52uG5Akdc8wkCQZBpIkw0CShGEgSQIWd93AVC1durRWr17ddRuSdF555JFHflJVy86sn7dhsHr1aoaHh7tuQ5LOK0meG6/uYSJJkmEgSRogDJKsSvJgkieTHElyS6t/PMnxJIfb45q+ZT6SZCTJ00mu6qtvarWRJNv76pcmebjVv5bkopn+oJKksxtkz+A0cFtVrQU2ANuSrG3zPldV69pjP0Cbdz3wFmAT8IUki5IsAj4PXA2sBW7oW8+n27reDLwE3DRDn0+SNIBJw6CqTlTVo236F8BTwIoJFtkM7K2qX1XVj4ERYH17jFTVs1X1a2AvsDlJgHcCX2/L7wGum+oHkiSdu3M6Z5BkNfA24OFWujnJY0l2J1nSaiuA5/sWO9ZqZ6u/EfhZVZ0+oz7e+29NMpxkeHR09FxalyRNYOAwSPJa4F7g1qp6GbgTeBOwDjgBfGZWOuxTVbuqaqiqhpYt+63LZCVJUzTQfQZJXkUvCO6uqm8AVNWLffO/CHyrvTwOrOpbfGWrcZb6T4GLkyxuewf94yVJc2CQq4kCfBl4qqo+21e/pG/Yu4En2vQ+4Pokr05yKbAG+D5wCFjTrhy6iN5J5n3V+0GFB4H3tOW3APdN72NJks7FIHsGlwPvAx5PcrjVPkrvaqB1QAFHgQ8CVNWRJPcAT9K7EmlbVf0GIMnNwAFgEbC7qo609X0Y2JvkU8AP6IWPdF5avf3+gcYd3XntLHciDW7SMKiq7wEZZ9b+CZa5Hbh9nPr+8ZarqmfpXW0kSeqAdyBLkgwDSZJhIEnCMJAkcR7/noE01wa9Skg6H7lnIEkyDCRJhoEkCcNAkoRhIEnCMJAkYRhIkjAMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJ+OM2Umc/WjPo+x7dee0sdyK5ZyBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSGCAMkqxK8mCSJ5McSXJLq78hycEkz7TnJa2eJHckGUnyWJK3961rSxv/TJItffV3JHm8LXNHkszGh5UkjW+QPYPTwG1VtRbYAGxLshbYDjxQVWuAB9prgKuBNe2xFbgTeuEB7AAuA9YDO8YCpI35QN9ym6b/0SRJg5o0DKrqRFU92qZ/ATwFrAA2A3vasD3AdW16M3BX9TwEXJzkEuAq4GBVnaqql4CDwKY27/VV9VBVFXBX37okSXPgnM4ZJFkNvA14GFheVSfarBeA5W16BfB832LHWm2i+rFx6uO9/9Ykw0mGR0dHz6V1SdIEBg6DJK8F7gVuraqX++e1v+hrhnv7LVW1q6qGqmpo2bJls/12krRgDBQGSV5FLwjurqpvtPKL7RAP7flkqx8HVvUtvrLVJqqvHKcuSZojg1xNFODLwFNV9dm+WfuAsSuCtgD39dVvbFcVbQB+3g4nHQA2JlnSThxvBA60eS8n2dDe68a+dUmS5sAgP3t5OfA+4PEkh1vto8BO4J4kNwHPAe9t8/YD1wAjwC+B9wNU1akknwQOtXGfqKpTbfpDwFeA1wDfbg9J0hyZNAyq6nvA2a77v3Kc8QVsO8u6dgO7x6kPA2+drBdJ0uzwDmRJkmEgSTIMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJDHY11FI6tDq7fcPPPbozmtnsRNdyAwDXbDO5R9RaaHzMJEkyTCQJBkGkiQMA0kShoEkCcNAkoRhIEnCMJAkYRhIkjAMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CSxABhkGR3kpNJnuirfTzJ8SSH2+OavnkfSTKS5OkkV/XVN7XaSJLtffVLkzzc6l9LctFMfkBJ0uQG2TP4CrBpnPrnqmpde+wHSLIWuB54S1vmC0kWJVkEfB64GlgL3NDGAny6revNwEvATdP5QJKkczdpGFTVd4FTA65vM7C3qn5VVT8GRoD17TFSVc9W1a+BvcDmJAHeCXy9Lb8HuO4cP4MkaZqmc87g5iSPtcNIS1ptBfB835hjrXa2+huBn1XV6TPq40qyNclwkuHR0dFptC5J6jfVMLgTeBOwDjgBfGbGOppAVe2qqqGqGlq2bNlcvKUkLQiLp7JQVb04Np3ki8C32svjwKq+oStbjbPUfwpcnGRx2zvoHy9JmiNT2jNIcknfy3cDY1ca7QOuT/LqJJcCa4DvA4eANe3KoYvonWTeV1UFPAi8py2/BbhvKj1JkqZu0j2DJF8FrgCWJjkG7ACuSLIOKOAo8EGAqjqS5B7gSeA0sK2qftPWczNwAFgE7K6qI+0tPgzsTfIp4AfAl2fs00mSBjJpGFTVDeOUz/oPdlXdDtw+Tn0/sH+c+rP0rjaSJHXEO5AlSYaBJMkwkCRhGEiSMAwkSUzxpjOpS6u33991C9IFxz0DSZJhIEkyDCRJeM5AuqAMej7l6M5rZ7kTnW/cM5AkGQaSJMNAkoRhIEnCMJAkYRhIkjAMJEkYBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSQwQBkl2JzmZ5Im+2huSHEzyTHte0upJckeSkSSPJXl73zJb2vhnkmzpq78jyeNtmTuSZKY/pCRpYoPsGXwF2HRGbTvwQFWtAR5orwGuBta0x1bgTuiFB7ADuAxYD+wYC5A25gN9y535XpKkWTZpGFTVd4FTZ5Q3A3va9B7gur76XdXzEHBxkkuAq4CDVXWqql4CDgKb2rzXV9VDVVXAXX3rkiTNkameM1heVSfa9AvA8ja9Ani+b9yxVpuofmycuiRpDk37BHL7i75moJdJJdmaZDjJ8Ojo6Fy8pSQtCFMNgxfbIR7a88lWPw6s6hu3stUmqq8cpz6uqtpVVUNVNbRs2bIpti5JOtNUw2AfMHZF0Bbgvr76je2qog3Az9vhpAPAxiRL2onjjcCBNu/lJBvaVUQ39q1LkjRHFk82IMlXgSuApUmO0bsqaCdwT5KbgOeA97bh+4FrgBHgl8D7AarqVJJPAofauE9U1dhJ6Q/Ru2LpNcC320MLzOrt93fdgrSgpXfI//wzNDRUw8PDXbehGWIYzE9Hd17bdQuaYUkeqaqhM+vegSxJMgwkSYaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJw0CShGEgScIwkCRhGEiSMAwkSRgGkiQMA0kShoEkCcNAkoRhIEnCMJAkYRhIkjAMJEkYBpIkDANJEoaBJAlY3HUDurCt3n5/1y1IGoB7BpIkw0CS5GEiSRMY9DDf0Z3XznInmm3uGUiSDANJkmEgSWKaYZDkaJLHkxxOMtxqb0hyMMkz7XlJqyfJHUlGkjyW5O1969nSxj+TZMv0PpIk6VzNxJ7Bn1fVuqoaaq+3Aw9U1RrggfYa4GpgTXtsBe6EXngAO4DLgPXAjrEAkSTNjdk4TLQZ2NOm9wDX9dXvqp6HgIuTXAJcBRysqlNV9RJwENg0C31Jks5iumFQwHeSPJJka6str6oTbfoFYHmbXgE837fssVY7W/23JNmaZDjJ8Ojo6DRblySNme59Bn9aVceT/D5wMMmP+mdWVSWpab5H//p2AbsAhoaGZmy9krTQTWvPoKqOt+eTwDfpHfN/sR3+oT2fbMOPA6v6Fl/ZamerS5LmyJTDIMnvJXnd2DSwEXgC2AeMXRG0BbivTe8DbmxXFW0Aft4OJx0ANiZZ0k4cb2w1SdIcmc5houXAN5OMreffqurfkxwC7klyE/Ac8N42fj9wDTAC/BJ4P0BVnUrySeBQG/eJqjo1jb4kSedoymFQVc8CfzJO/afAlePUC9h2lnXtBnZPtRdJ0vR4B7IkyTCQJBkGkiQMA0kShoEkCX/pTFPkD91LFxb3DCRJ7hlImj5/K/n8556BJMkwkCQZBpIkDANJEoaBJAnDQJKEYSBJwjCQJGEYSJIwDCRJGAaSJAwDSRKGgSQJv7VUZ/B3CjSb/HbT+cs9A0mSYSBJMgwkSRgGkiQMA0kShoEkCS8tXTC8ZFTSRAwDSfOO9yPMPQ8TSZIMA0mSh4nOe54LkDQT5k0YJNkE/BOwCPhSVe3suCVJ89y5/DHk+YWJzYvDREkWAZ8HrgbWAjckWdttV5K0cMyXPYP1wEhVPQuQZC+wGXiy065mmId0pO7M9P9/F9qexnwJgxXA832vjwGXnTkoyVZga3v530menoPeJrMU+EnXTcwDbodXuC16LujtkE+f0/D5tC3+cLzifAmDgVTVLmBX1330SzJcVUNd99E1t8Mr3BY9bodXnA/bYl6cMwCOA6v6Xq9sNUnSHJgvYXAIWJPk0iQXAdcD+zruSZIWjHlxmKiqTie5GThA79LS3VV1pOO2BjWvDlt1yO3wCrdFj9vhFfN+W6Squu5BktSx+XKYSJLUIcNAkmQYzJQktyWpJEu77qUrSf4hyY+SPJbkm0ku7rqnuZRkU5Knk4wk2d51P11JsirJg0meTHIkyS1d99SlJIuS/CDJt7ruZSKGwQxIsgrYCPxX17107CDw1qr6Y+A/gY903M+c8StV/p/TwG1VtRbYAGxbwNsC4Bbgqa6bmIxhMDM+B/w9sKDPxlfVd6rqdHv5EL37RRaK//tKlar6NTD2lSoLTlWdqKpH2/Qv6P1DuKLbrrqRZCVwLfClrnuZjGEwTUk2A8er6odd9zLP/C3w7a6bmEPjfaXKgvwHsF+S1cDbgIe77aQz/0jvD8X/6bqRycyL+wzmuyT/AfzBOLM+BnyU3iGiBWGibVFV97UxH6N3qODuuexN80uS1wL3ArdW1ctd9zPXkrwLOFlVjyS5out+JmMYDKCq/mK8epI/Ai4FfpgEeodFHk2yvqpemMMW58zZtsWYJH8DvAu4shbWTSx+pUqfJK+iFwR3V9U3uu6nI5cDf5nkGuB3gdcn+deq+uuO+xqXN53NoCRHgaGqmi/fTjin2g8UfRb4s6oa7bqfuZRkMb2T5lfSC4FDwF+dR3fSz5j0/jLaA5yqqlu77mc+aHsGf1dV7+q6l7PxnIFm0j8DrwMOJjmc5F+6bmiutBPnY1+p8hRwz0IMguZy4H3AO9t/B4fbX8eax9wzkCS5ZyBJMgwkSRgGkiQMA0kShoEkCcNAkoRhIEkC/hdVVliRb+I2xgAAAABJRU5ErkJggg==\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"fig, ax = plt.subplots()\n",
"ax.hist(samples, bins=30)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So that's good: looks pretty normal to me."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sequential Importance Sampling\n",
"Now imagine we are dealing with a problem in which data is sequential, like a time series or a linear sequence of nucleotides in DNA. Let's assume that after $n$ (time) steps, you have $n$ observations $y_1, \\ldots, y_n \\equiv y_{1:n}$ and you want to know which parameters $x_1, \\ldots, x_n\\equiv x_{1:n}$ at steps $1, \\ldots, n$ gave rise to these observations. Then you would have to draw samples from the posterior distribution $p(x_{1:n}|y_{1:n})$ of \"paths\" $x_{1:n}$. Say you somehow managed to do that and proceed to step $n+1$. Then you will have to sample from the posterior distributions of paths $x_{1:n+1}$. This gets pretty unwieldy for long sequences. \n",
"But if you can assume that the sequence of $x$'ses is a Markov chain, namely, $p(x_{n+1}|x_{1:n}) = p(x_{n+1}|x_n)$, then you can recycle samples from previous time steps.\n",
"When using Importance Sampling to sample from these distributions, the importance weights can be calculated recursively. A nice special case is obtained if the importance distribution $q$ is just the prior distribution $p(x_{1:n})$. In that case, the importance weight of the $i$-th sample is given by\n",
"\\begin{equation}\n",
" w(x^i_n) \\propto w(x^i_{n-1}) p(y_n|x^i_n)\n",
"\\end{equation}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A problem with SIS is now that, after already a few steps, only very few weights will have values significantly different from zero. TODO: why exactly is that? I suspect that somehow the weights keep the memory of the initial stage when there was only very little data and the probabilities $p(y|x)$ are very low becaues of bad estimates of the $x$."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So let's implement SIS. First, we write a class representing a \"sequential prior distribution\", which encodes the Markov property. Basically, it draws new samples based only on the most recent batch of samples:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"class SequentialPrior(Pdf):\n",
"\n",
" def __init__(self, distribution, first_state):\n",
"\n",
" self.distribution = distribution\n",
" self.samples = first_state\n",
"\n",
" def sample(self, _):\n",
"\n",
" self.samples = self.distribution.sample(self.samples)\n",
"\n",
" return self.samples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To instantiate a `SequentialPrior` object, we require some conditional distribution (`distribution`) and an initial batch of samples (`first_state`). The `sample` method then draws samples from the distribution conditioned on the previous batch of samples."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next up is the actual Sequential Importance Sampler. It derives from `ImportanceSampler`, but saves the current weights and modifies the `calculate_weights` method to calculate weights recursively:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"class SequentialImportanceSampler(ImportanceSampler):\n",
"\n",
" def __init__(self, posterior):\n",
"\n",
" super(SequentialImportanceSampler, self).__init__(posterior)\n",
" self.weights = None\n",
"\n",
" def sample(self, n_samples):\n",
"\n",
" (imp_samples, weights) = super(\n",
" SequentialImportanceSampler, self).sample(self.posterior.prior,\n",
" n_samples)\n",
" # store current weights to make them available to recursion\n",
" self.weights = weights\n",
"\n",
" return imp_samples, weights\n",
"\n",
" def calculate_weights(self, _, imp_samples):\n",
"\n",
" if self.weights is None:\n",
" self.weights = np.log(np.ones(len(imp_samples)) / len(imp_samples))\n",
"\n",
" likelihoods = self.posterior.likelihood_function(imp_samples)\n",
"\n",
" unnormalized_weights = self.weights + likelihoods\n",
" self.weights = unnormalized_weights - logsumexp(unnormalized_weights)\n",
"\n",
" return self.weights"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we decide for a concrete type of model we want to test this on. We choose a Hidden Markov Modell (HMM) with discrete hidden states and discrete observed states:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"class HMMPosterior(Pdf):\n",
"\n",
" def __init__(self, trans_matrix, obs_probs):\n",
"\n",
" self.observations = []\n",
" self.obs_probs = obs_probs\n",
" self.prior = trans_matrix\n",
"\n",
" def likelihood_function(self, hidden_states):\n",
"\n",
" return np.log(self.obs_probs[hidden_states, self.observations[-1]])\n",
"\n",
" def add_observation(self, x):\n",
"\n",
" self.observations.append(x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This guy takes a transition matrix for the hidden states (`trans_matrix`) and a matrix with the observation probabilities conditioned on hidden states (`obs_probs`). The likelihood function evaluates the likelihood of the most recent observation given a hidden state. `add_observation`, well, adds an observation. \n",
"The last ingredient now is a transition matrix object:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class TransitionMatrix(object):\n",
"\n",
" def __init__(self, transition_matrix):\n",
"\n",
" self.transition_matrix = transition_matrix\n",
" self.hidden_states = np.arange(len(transition_matrix))\n",
"\n",
" def sample(self, old_states):\n",
"\n",
" return np.array([choice(self.hidden_states,\n",
" p=self.transition_matrix[hidden_state])\n",
" for hidden_state in old_states])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It basically just wraps a `numpy` matrix and provides a neat `sample` function that provides the required interface for use with `SequentialPrior`. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we have to stitch all of this together. First we define the parameters of the HMM:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"hidden_states = [0, 1]\n",
"observation_states = [0, 1]\n",
"start_prob = np.array([0.5, 0.5])\n",
"trans_matrix = np.array([[0.9, 0.1],\n",
" [0.1, 0.9]])\n",
"obs_probs = np.array([[0.9, 0.1],\n",
" [0.1, 0.9]])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's choose some number of importance samples which we will draw each time a new observation comes in:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"n_particles = 1000"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We now draw an initial state for the Markov chain in the HMM and instantiate all objects we need:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"start_state = choice(hidden_states, n_particles, p=start_prob)\n",
"\n",
"pdist = TransitionMatrix(trans_matrix)\n",
"prior = SequentialPrior(pdist, start_state)\n",
"posterior = HMMPosterior(prior, obs_probs)\n",
"sampler = SequentialImportanceSampler(posterior)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"No inference without some nice fake data generated from the model:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"n_observations = 30\n",
"# we have to start with some state...\n",
"true_states = [0]\n",
"observations = []\n",
"for _ in range(n_observations):\n",
" new_state = choice(hidden_states, p=trans_matrix[true_states[-1]])\n",
" new_observation = choice(\n",
" observation_states, p=obs_probs[new_state])\n",
" observations.append(new_observation)\n",
" true_states.append(new_state)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Slowly getting there. Here's two variables which will hold successively more samples and importance weights:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"# will store sucessive posterior samples; is enlarged in each iteration\n",
"states_samples = None\n",
"# stores log-weights for all samples\n",
"all_log_ws = []"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Aaaaand we sample. We're in an on-line setting and pretend to have a new observation after each time step:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"for observation in observations:\n",
" # we're on-line: a new observation is arriving!\n",
" posterior.add_observation(observation)\n",
"\n",
" # draw samples from importance distribution and\n",
" # store them along with their corresponding importance\n",
" # weights\n",
" (samples, log_ws) = sampler.sample(n_particles)\n",
"\n",
" if states_samples is None:\n",
" states_samples = samples.reshape(1, n_particles)\n",
" else:\n",
" # append n_particles samples to states_samples\n",
" states_samples = np.vstack((states_samples, samples))\n",
"\n",
" all_log_ws.append(log_ws)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, to get samples from the target posterior distribution of hidden states, we have to draw unbiased samples again:"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"# ... and back to actual probabilities\n",
"all_weights = np.exp(all_log_ws)\n",
"# get samples from target by reweighting importance samples\n",
"target_samples = np.array([choice(samples, n_samples, p=weight)\n",
" for weight, samples in zip(all_weights,\n",
" states_samples)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And finally, a fancy plot summarizing the results:"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 648x216 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"# make fancy plots\n",
"times = np.arange(n_observations)\n",
"fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(9,3))\n",
"# plot data and time trace of one of the posterior probabilities\n",
"ax1.scatter(times, observations)\n",
"ax1.plot(times, target_samples.sum(1) / n_samples,\n",
" ls='--', color='orange', label=r'$p(x=1|y_{1:t})$')\n",
"ax1.set_yticks((0, 1))\n",
"ax1.set_xlabel(\"time\")\n",
"ax1.legend()\n",
"\n",
"# for each time step, plot cumulative distribution of weights\n",
"# whose lines get darker with increasing time step\n",
"for alpha, weights in zip(np.linspace(1, 0, n_observations),\n",
" all_weights):\n",
" ax2.plot(np.sort(weights),\n",
" np.linspace(0, 1, n_particles, endpoint=False),\n",
" color=\"black\", alpha=alpha)\n",
"# double log scale because the plot sucks otherwise\n",
"ax2.set_xscale(\"log\")\n",
"ax2.set_yscale(\"log\")\n",
"ax2.set_ylabel(\"empirical cumulative\\ndistribution function\")\n",
"ax2.set_xlabel(\"weight value\")\n",
"\n",
"fig.tight_layout()\n",
"plt.show()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The left plot shows the data together with the posterior probabilities for the hidden state to be $1$. The right plot shows the empirical cumulative distribution function (eCDF) of the weight values for different times. So the $y$-axis shows the fraction of weights which are smaller than a certain value. The more time progresses, the fainter the lines get. They shift towards the left, meaning that more and more weights have values of essentially zero, which nicely illustrates the weight decay."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "venv"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import logsumexp
choice = np.random.choice
class Pdf(object):
def __call__(self, x):
"""log-probability"""
pass
def sample(self, n):
pass
class ImportanceSampler(object):
def __init__(self, posterior):
self.posterior = posterior
def sample(self, imp_dist, n_samples):
imp_samples = imp_dist.sample(n_samples)
imp_weights = self.calculate_weights(imp_dist, imp_samples)
# doing stuff in log-space to avoid underflows
normalized_imp_weights = imp_weights - logsumexp(imp_weights)
return imp_samples, normalized_imp_weights
def calculate_weights(self, imp_dist, imp_samples):
return self.posterior(imp_samples) - imp_dist(imp_samples)
class SequentialImportanceSampler(ImportanceSampler):
def __init__(self, posterior):
super(SequentialImportanceSampler, self).__init__(posterior)
self.weights = None
def sample(self, n_samples):
(imp_samples, weights) = super(
SequentialImportanceSampler, self).sample(self.posterior.prior,
n_samples)
# store current weights to make them available to recursion
self.weights = weights
return imp_samples, weights
def calculate_weights(self, _, imp_samples):
if self.weights is None:
self.weights = np.log(np.ones(len(imp_samples)) / len(imp_samples))
likelihoods = self.posterior.likelihood_function(imp_samples)
unnormalized_weights = self.weights + likelihoods
self.weights = unnormalized_weights - logsumexp(unnormalized_weights)
return self.weights
class Gaussian(Pdf):
def __init__(self, mu=0, sigma=1):
self.mu = mu
self.sigma = sigma
def __call__(self, x):
return -0.5 * (x - self.mu) ** 2 / self.sigma / self.sigma
def sample(self, n):
return np.random.normal(self.mu, self.sigma, n)
class Uniform(Pdf):
def __init__(self, low, high):
self.low = low
self.high = high
def __call__(self, x):
return np.repeat(-np.log(self.high - self.low), len(x))
def sample(self, n):
return np.random.uniform(self.low, self.high, n)
class HMMPosterior(Pdf):
def __init__(self, trans_matrix, obs_probs):
self.observations = []
self.obs_probs = obs_probs
self.prior = trans_matrix
def likelihood_function(self, hidden_states):
return np.log(self.obs_probs[hidden_states, self.observations[-1]])
def add_observation(self, x):
self.observations.append(x)
class TransitionMatrix(object):
def __init__(self, transition_matrix):
self.transition_matrix = transition_matrix
self.hidden_states = np.arange(len(transition_matrix))
def sample(self, old_states):
return np.array([choice(self.hidden_states,
p=self.transition_matrix[hidden_state])
for hidden_state in old_states])
class SequentialPrior(Pdf):
def __init__(self, distribution, first_state):
self.distribution = distribution
self.samples = first_state
def sample(self, n_samples):
self.samples = self.distribution.sample(self.samples)
return self.samples
if True:
if not False:
# importance sampling
n_samples = 200000
target = Gaussian()
imp = Uniform(-10, 10)
sampler = ImportanceSampler(target)
biased_samples, logws = sampler.sample(imp, n_samples)
# we have samples from the importance distribution,
# but not from the target distribution. So we have
# to reweight. To get samples from the target,
# just draw from the list of importance samples with
# probabilities given by the weights
samples = choice(
biased_samples, 2*n_samples, p=np.exp(logws))
fig, ax = plt.subplots()
ax.hist(samples, bins=30)
plt.show()
if True:
# sequential importance sampling
n_particles = 100
# define multinomial HMM (discrete hidden states,
# discrete observation states)
hidden_states = [0, 1]
observation_states = [0, 1]
start_prob = np.array([0.5, 0.5])
trans_matrix = np.array([[0.9, 0.1],
[0.1, 0.9]])
obs_probs = np.array([[0.9, 0.1],
[0.1, 0.9]])
# draw first state of Markov chain from the start distribution
start_state = choice(hidden_states, n_particles, p=start_prob)
pdist = TransitionMatrix(trans_matrix)
prior = SequentialPrior(pdist, start_state)
posterior = HMMPosterior(prior, obs_probs)
sampler = SequentialImportanceSampler(posterior)
# generate data
n_observations = 30
# we have to start with some state...
true_states = [0]
observations = []
for _ in range(n_observations):
new_state = choice(hidden_states, p=trans_matrix[true_states[-1]])
new_observation = choice(
observation_states, p=obs_probs[new_state])
observations.append(new_observation)
true_states.append(new_state)
# will store sucessive posterior samples; is enlarged in each iteration
states_samples = None
# stores log-weights for all samples
all_log_ws = []
for observation in observations:
# we're on-line: a new observation is arriving!
posterior.add_observation(observation)
# draw samples from importance distribution and
# store them along with their corresponding importance
# weights
(samples, log_ws) = sampler.sample(n_particles)
if states_samples is None:
states_samples = samples.reshape(1, n_particles)
else:
# append n_particles samples to states_samples
states_samples = np.vstack((states_samples, samples))
all_log_ws.append(log_ws)
# ... and back to actual probabilities
all_weights = np.exp(all_log_ws)
# get samples from target by reweighting importance samples
target_samples = np.array([choice(samples, n_samples, p=weight)
for weight, samples in zip(all_weights,
states_samples)])
# make fancy plots
times = np.arange(n_observations)
fig, (ax1, ax2) = plt.subplots(1, 2)
# plot data and time trace of one of the posterior probabilities
ax1.scatter(times, observations)
ax1.plot(times, target_samples.sum(1) / n_samples,
ls='--', color='orange', label=r'$p(x=1|y_{1:t})$')
ax1.set_yticks((0, 1))
ax1.set_xlabel("time")
ax1.legend()
# for each time step, plot cumulative distribution of weights
# whose lines get darker with increasing time step
for alpha, weights in zip(np.linspace(1, 0, n_observations),
all_weights):
ax2.plot(np.sort(weights),
np.linspace(0, 1, n_particles, endpoint=False),
color="black", alpha=alpha)
# double log scale because the plot sucks otherwise
ax2.set_xscale("log")
ax2.set_yscale("log")
ax2.set_ylabel("empirical cumulative\ndistribution function")
ax2.set_xlabel("weight value")
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment