%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# ----- data -----
X, y = make_moons(n_samples=2500, noise=0.1, random_state=0)
X = (X - X.mean(0)) / X.std(0) # standardize
def plot_moons(X, title="2D Moons Dataset"):
# plot
plt.figure(figsize=(5, 5))
plt.scatter(X[:,0], X[:,1], s=10, alpha=0.7)
plt.title(title)
plt.xlabel("x1")
plt.ylabel("x2")
plt.gca().set_aspect('equal', adjustable='box')
plt.show()
plot_moons(X)
def dataloader(X, y, batch_size=64, shuffle=True):
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.int64)
data = TensorDataset(X, y)
return DataLoader(data, batch_size=batch_size, shuffle=shuffle)
loader = dataloader(X, y, batch_size=128, shuffle=True)
In the lectures the Variational Autoencoder (VAE) was introduced as a specific generative latent variable model. Compared to probabilistic PCA, VAE employs nonlinear functions (neural network) for encoding and decoding, and its learning is achieved by maximizing the evidence lower bound (ELBO).
Given observed data points $\{ x_n \}_{n=1}^N$ distributed according to some unknown data distribution $p_\mathrm{data}(x)$, the goal of generative models is to learn a model $p_\theta(x)$ that we can sample from, such that $p_\theta(x)$ is as similar to $p_\mathrm{data}(x)$ as possible.
Let's consider the following latent variable model:
$$ x \sim p_\theta(x|z), \quad z \sim p(z). $$where $p(z)$ could be a simple Gaussian and $\theta$ is the parameter of a neural network. Then the marginal likelihood of $x$ can be writen as
$$ p_\theta(x) = \int p_\theta(x | z) p(z) dz, $$which is the objective we want to maximize. But unfortunately, this integral is usually intractable due to the nonlinar and high-dimensional functions (i.e., neural network) inside $z$! Instead, we try to find a lower bound on $p_\theta(x)$ that is easier to compute, which gives the ELBO objective.
Specifically, we focus on maximizing the log-likelihood $\log p_\theta(x)$ by introducing an approximate posterior $q_\phi(z|x)$, as
$$ \log p_\theta(x) = \log \int p_\theta(x|z) p(z) \, dz = \log \int q_\phi(z|x)\,\frac{p_\theta(x|z) p(z)}{q_\phi(z|x)} dz = \log \mathbb{E}_{q_\phi(z|x)}\left[\frac{p_\theta(x|z) p(z)}{q_\phi(z|x)}\right]. $$Applying Jensen's Inequality to this equation results in the Evidence Lower BOund (ELBO):
$$ \begin{aligned} \mathcal{L}(x; \theta, \phi) &= \log \mathbb{E}_{q_\phi(z|x)}\left[\frac{p_\theta(x|z) p(z)}{q_\phi(z|x)}\right] \\ &\geq \mathbb{E}_{q_\phi(z|x)}\left[\log \frac{p_\theta(x|z) p(z)}{q_\phi(z|x)}\right] \\ &= \mathbb{E}_{q_\phi(z|x)}\left[\log p_\theta(x|z)\right] - D_{KL}(q_\phi(z|x) \| p(z)). \end{aligned} $$The first term $\mathbb{E}_{q_\phi(z|x)}\left[\log p_\theta(x|z)\right]$ is called the reconstruction loss maximizing the log-likelihood and the second term $-D_{KL}(q_\phi(z|x) \| p(z))$ is a regularizer minimizing the distance between $q_\phi(z|x)$ and $p(z)$.
So, optimizing the ELBO means learning an encoder ($\phi$) and a decoder ($\theta$) that fits the dataset while ensuring the encoded latent variable $z$ remains close to the prior $p(z)$.
In this exercise, you will learn the key techniques that make VAEs actually trainable and will implement the VAE on a small dataset.
Note that the KL term has a closed-form if both the approximate posterior $q_\phi(z|x)$ and prior $p(z)$ are Gaussian distribution. For example, we define $p(z)$ a standard multivariate Gaussian and $q_\phi(z|x)$ a multivariate Gaussian with a diagonal covariance matrix:
$$ p(z) = \mathcal{N}(0, I), \quad q_\phi(z|x) = \mathcal{N}(\mu_\phi(x), \mathrm{diag}(\sigma^2_\phi(x))). $$The KL divergence between them has a simple yet analytical solution given by
$$ D_{KL}(q_\phi(z|x) \| p(z)) = -\frac{1}{2} \sum_{n=1}^N (1 + \log (\sigma_\phi^2(x_n)) - \mu_\phi^2(x_n) - \sigma_\phi^2(x_n)), $$where $N$ is the number of dimensions in the latent space. $\mu_\phi$ and $\sigma_\phi^2$ are the mean and variance generated by the encoder, respectively.
(a) KL Implementation:
def kl_std_normal(mu, logvar):
# mu and log_var are the outputs of the encoder network
# KL(N(mu, diag(sigma^2)) || N(0, I))
return -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=-1)
The question now turns to how to compute the expectation over posterior i.e., $\mathbb{E}_{z \sim q_\phi(z|x)}\left[\log p_\theta(x|z)\right]$). However, the latent variable $z$ is sampled from $q_\phi(z | x)$, whose parameter $\phi$ is what we want to optimize as well. And the sampling process itself, such as , is not differentiable. Then how can we compute the gradient $\nabla_\phi$ from the random sampling node and make sure it is avaiable in backpropagation? The solution is the Reparameterization Trick.
The trick is to separate the randomness from the parameters. Instead of directly sampling $z \sim q_\phi(z | x)$, we generate $z$ from the following two steps:
In this way, $z$ is still a random variable with distribution $\mathcal{N}(\mu_\phi(x), \sigma^2_\phi(x))$, but the randomness comes entirely from $\epsilon$ rather than the encoder.
(b) Sampling Implementation:
def reparameterize(mu, logvar):
# z = mu + sigma * eps, eps ~ N(0, I)
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + std * eps
(c) The final ELBO loss function:
def elbo_loss(model, x, beta=0.1):
mu, logvar = model.encode(x)
z = reparameterize(mu, logvar)
x_hat = model.decode(z)
# Reconstruction loss
recon_loss = F.l1_loss(x_hat, x, reduction='none').sum(dim=1)
# KL divergence loss
kl_loss = kl_std_normal(mu, logvar)
loss = (recon_loss + beta * kl_loss).mean()
return loss, recon_loss.mean().detach(), kl_loss.mean().detach()
Now we can define the Network architecture and train the VAE model.
Task: Read through and understand the implementation of the training procedure.
class VAE(nn.Module):
def __init__(self, dim=128):
super(VAE, self).__init__()
# nonlinear encoder with parameter theta
self.encoder = nn.Sequential(
nn.Linear(2, dim), nn.ReLU(),
nn.Linear(dim, dim), nn.ReLU(),
nn.Linear(dim, dim), nn.ReLU(),
nn.Linear(dim, 4)
)
# nonlinear decoder with parameter phi
self.decoder = nn.Sequential(
nn.Linear(2, dim), nn.ReLU(),
nn.Linear(dim, dim), nn.ReLU(),
nn.Linear(dim, dim), nn.ReLU(),
nn.Linear(dim, 2)
)
def encode(self, x):
"""
Predict mean and log-covariance.
"""
mu, logvar = self.encoder(x).chunk(2, dim=1)
return mu, logvar
def decode(self, z):
"""
Return reconstructed data.
"""
return self.decoder(z)
def forward(self, x):
"""
Encode and decode with the Reparameterization trick
"""
mu, logvar = self.encode(x)
z = reparameterize(mu, logvar)
x_hat = self.decode(z)
return x_hat, mu, logvar
def train_vae(epochs=200, lr=0.001, beta=0.1, print_every=20):
vae = VAE(dim=128).to(device)
optimizer = torch.optim.Adam(vae.parameters(), lr=lr)
for epoch in range(1, epochs+1):
for x, _ in loader:
x = x.to(device)
loss, recon, kl = elbo_loss(vae, x, beta=beta)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % print_every == 0:
print(f"Epoch: {epoch} | loss: {loss.item():.4f} | recon {recon.item():.4f} | kl {kl.item():.4f}")
return vae
vae = train_vae()
Once trained, we can obtain new data points by sampling from the noise distribution.
with torch.no_grad():
X_tensor = torch.tensor(X, dtype=torch.float32)
x_hat, mu, logvar = vae.forward(X_tensor)
x_hat, mu = x_hat.numpy(), mu.numpy()
fig, axes = plt.subplots(1, 3, figsize=(16, 5))
axes[0].scatter(X[:, 0], X[:, 1], c='blue', alpha=0.6)
axes[0].set_title('Original Moons Dataset')
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].grid(True, alpha=0.3)
axes[1].scatter(x_hat[:, 0], x_hat[:, 1], c='red', alpha=0.6)
axes[1].set_title('Reconstructed Data')
axes[1].set_xlabel('Feature 1')
axes[1].set_ylabel('Feature 2')
axes[1].grid(True, alpha=0.3)
# Latent space representation
axes[2].scatter(mu[:, 0], mu[:, 1], c='green', alpha=0.6)
axes[2].set_title('Latent Space (μ)')
axes[2].set_xlabel('Latent Dimension 1')
axes[2].set_ylabel('Latent Dimension 2')
axes[2].grid(True, alpha=0.3)
(d) Sampling data from noise:
@torch.no_grad()
def vae_sampling(n_samples=2000):
z = torch.randn(n_samples, 2).to(device)
return vae.decode(z)
samples = vae_sampling()
plot_moons(samples, title="Sampled data from noise")