In [1]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons

import torch
import torch.nn as nn 
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

device = 'cuda' if torch.cuda.is_available() else 'cpu'
In [2]:
# ----- data -----
X, y = make_moons(n_samples=2500, noise=0.1, random_state=0)
X = (X - X.mean(0)) / X.std(0) # standardize

def plot_moons(X, title="2D Moons Dataset"):
    # plot
    plt.figure(figsize=(5, 5))
    plt.scatter(X[:,0], X[:,1], s=10, alpha=0.7)
    plt.title(title)
    plt.xlabel("x1")
    plt.ylabel("x2")
    plt.gca().set_aspect('equal', adjustable='box')
    plt.show()

plot_moons(X)

def dataloader(X, y, batch_size=64, shuffle=True):    
    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.int64)
    data = TensorDataset(X, y)
    return DataLoader(data, batch_size=batch_size, shuffle=shuffle)

loader = dataloader(X, y, batch_size=128, shuffle=True)

Semi-supervised and generative models

In the lectures the Variational Autoencoder (VAE) was introduced as a specific generative latent variable model. Compared to probabilistic PCA, VAE employs nonlinear functions (neural network) for encoding and decoding, and its learning is achieved by maximizing the evidence lower bound (ELBO).

Exercise 9.1 - Understand and derive the ELBO

Given observed data points $\{ x_n \}_{n=1}^N$ distributed according to some unknown data distribution $p_\mathrm{data}(x)$, the goal of generative models is to learn a model $p_\theta(x)$ that we can sample from, such that $p_\theta(x)$ is as similar to $p_\mathrm{data}(x)$ as possible.

Let's consider the following latent variable model:

$$ x \sim p_\theta(x|z), \quad z \sim p(z). $$

where $p(z)$ could be a simple Gaussian and $\theta$ is the parameter of a neural network. Then the marginal likelihood of $x$ can be writen as

$$ p_\theta(x) = \int p_\theta(x | z) p(z) dz, $$

which is the objective we want to maximize. But unfortunately, this integral is usually intractable due to the nonlinar and high-dimensional functions (i.e., neural network) inside $z$! Instead, we try to find a lower bound on $p_\theta(x)$ that is easier to compute, which gives the ELBO objective.

Specifically, we focus on maximizing the log-likelihood $\log p_\theta(x)$ by introducing an approximate posterior $q_\phi(z|x)$, as

$$ \log p_\theta(x) = \log \int p_\theta(x|z) p(z) \, dz = \log \int q_\phi(z|x)\,\frac{p_\theta(x|z) p(z)}{q_\phi(z|x)} dz = \log \mathbb{E}_{q_\phi(z|x)}\left[\frac{p_\theta(x|z) p(z)}{q_\phi(z|x)}\right]. $$

Applying Jensen's Inequality to this equation results in the Evidence Lower BOund (ELBO):

$$ \begin{aligned} \mathcal{L}(x; \theta, \phi) &= \log \mathbb{E}_{q_\phi(z|x)}\left[\frac{p_\theta(x|z) p(z)}{q_\phi(z|x)}\right] \\ &\geq \mathbb{E}_{q_\phi(z|x)}\left[\log \frac{p_\theta(x|z) p(z)}{q_\phi(z|x)}\right] \\ &= \mathbb{E}_{q_\phi(z|x)}\left[\log p_\theta(x|z)\right] - D_{KL}(q_\phi(z|x) \| p(z)). \end{aligned} $$

The first term $\mathbb{E}_{q_\phi(z|x)}\left[\log p_\theta(x|z)\right]$ is called the reconstruction loss maximizing the log-likelihood and the second term $-D_{KL}(q_\phi(z|x) \| p(z))$ is a regularizer minimizing the distance between $q_\phi(z|x)$ and $p(z)$.

So, optimizing the ELBO means learning an encoder ($\phi$) and a decoder ($\theta$) that fits the dataset while ensuring the encoded latent variable $z$ remains close to the prior $p(z)$.

Exercise 9.2 - Implement the VAE

In this exercise, you will learn the key techniques that make VAEs actually trainable and will implement the VAE on a small dataset.

Compute the KL divergence

Note that the KL term has a closed-form if both the approximate posterior $q_\phi(z|x)$ and prior $p(z)$ are Gaussian distribution. For example, we define $p(z)$ a standard multivariate Gaussian and $q_\phi(z|x)$ a multivariate Gaussian with a diagonal covariance matrix:

$$ p(z) = \mathcal{N}(0, I), \quad q_\phi(z|x) = \mathcal{N}(\mu_\phi(x), \mathrm{diag}(\sigma^2_\phi(x))). $$

The KL divergence between them has a simple yet analytical solution given by

$$ D_{KL}(q_\phi(z|x) \| p(z)) = -\frac{1}{2} \sum_{n=1}^N (1 + \log (\sigma_\phi^2(x_n)) - \mu_\phi^2(x_n) - \sigma_\phi^2(x_n)), $$

where $N$ is the number of dimensions in the latent space. $\mu_\phi$ and $\sigma_\phi^2$ are the mean and variance generated by the encoder, respectively.

(a) KL Implementation:

In [3]:
def kl_std_normal(mu, logvar):
    # mu and log_var are the outputs of the encoder network
    # KL(N(mu, diag(sigma^2)) || N(0, I))
    return -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=-1)

Reparameterization trick

The question now turns to how to compute the expectation over posterior i.e., $\mathbb{E}_{z \sim q_\phi(z|x)}\left[\log p_\theta(x|z)\right]$). However, the latent variable $z$ is sampled from $q_\phi(z | x)$, whose parameter $\phi$ is what we want to optimize as well. And the sampling process itself, such as , is not differentiable. Then how can we compute the gradient $\nabla_\phi$ from the random sampling node and make sure it is avaiable in backpropagation? The solution is the Reparameterization Trick.

The trick is to separate the randomness from the parameters. Instead of directly sampling $z \sim q_\phi(z | x)$, we generate $z$ from the following two steps:

  1. Sample a random noise from the standard normal distribution: $\epsilon \sim \mathcal{N}(0, I)$.
  2. Obtain $z$ from a deterministic, differentiable function: $z = \mu_\phi(x) + \sigma_\phi(x) \cdot \epsilon$.

In this way, $z$ is still a random variable with distribution $\mathcal{N}(\mu_\phi(x), \sigma^2_\phi(x))$, but the randomness comes entirely from $\epsilon$ rather than the encoder.

(b) Sampling Implementation:

In [4]:
def reparameterize(mu, logvar):
    # z = mu + sigma * eps, eps ~ N(0, I)
    std = torch.exp(0.5 * logvar)
    eps = torch.randn_like(std)
    return mu + std * eps

(c) The final ELBO loss function:

In [5]:
def elbo_loss(model, x, beta=0.1):
    mu, logvar = model.encode(x)
    z = reparameterize(mu, logvar)
    x_hat = model.decode(z)

    # Reconstruction loss
    recon_loss = F.l1_loss(x_hat, x, reduction='none').sum(dim=1)
    
    # KL divergence loss
    kl_loss = kl_std_normal(mu, logvar)
    loss = (recon_loss + beta * kl_loss).mean()
    return loss, recon_loss.mean().detach(), kl_loss.mean().detach()

Now we can define the Network architecture and train the VAE model.

Task: Read through and understand the implementation of the training procedure.

In [6]:
class VAE(nn.Module):
    def __init__(self, dim=128):
        super(VAE, self).__init__()

        # nonlinear encoder with parameter theta
        self.encoder = nn.Sequential(
            nn.Linear(2, dim), nn.ReLU(),
            nn.Linear(dim, dim), nn.ReLU(),
            nn.Linear(dim, dim), nn.ReLU(),
            nn.Linear(dim, 4)
        )

        # nonlinear decoder with parameter phi
        self.decoder = nn.Sequential(
            nn.Linear(2, dim), nn.ReLU(),
            nn.Linear(dim, dim), nn.ReLU(),
            nn.Linear(dim, dim), nn.ReLU(),
            nn.Linear(dim, 2)
        )

    def encode(self, x):
        """
        Predict mean and log-covariance.
        """
        mu, logvar = self.encoder(x).chunk(2, dim=1)
        return mu, logvar

    def decode(self, z):
        """
        Return reconstructed data.
        """
        return self.decoder(z)
    
    def forward(self, x):
        """
        Encode and decode with the Reparameterization trick
        """
        mu, logvar = self.encode(x)
        z = reparameterize(mu, logvar)
        x_hat = self.decode(z)
        return x_hat, mu, logvar
    
    
def train_vae(epochs=200, lr=0.001, beta=0.1, print_every=20):
    vae = VAE(dim=128).to(device)
    optimizer = torch.optim.Adam(vae.parameters(), lr=lr)
    
    for epoch in range(1, epochs+1):
        for x, _ in loader:
            x = x.to(device)
            loss, recon, kl = elbo_loss(vae, x, beta=beta)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        if epoch % print_every == 0:
            print(f"Epoch: {epoch} | loss: {loss.item():.4f} | recon {recon.item():.4f} | kl {kl.item():.4f}")
    
    return vae

vae = train_vae()
Epoch: 20 | loss: 0.5688 | recon 0.2206 | kl 3.4828
Epoch: 40 | loss: 0.5581 | recon 0.2179 | kl 3.4019
Epoch: 60 | loss: 0.5725 | recon 0.2471 | kl 3.2539
Epoch: 80 | loss: 0.5732 | recon 0.2388 | kl 3.3441
Epoch: 100 | loss: 0.5164 | recon 0.1718 | kl 3.4465
Epoch: 120 | loss: 0.5594 | recon 0.2095 | kl 3.4991
Epoch: 140 | loss: 0.5505 | recon 0.2261 | kl 3.2445
Epoch: 160 | loss: 0.5439 | recon 0.2151 | kl 3.2874
Epoch: 180 | loss: 0.5640 | recon 0.2409 | kl 3.2312
Epoch: 200 | loss: 0.5567 | recon 0.2144 | kl 3.4230

Exercise 9.3 - Model Inference

Once trained, we can obtain new data points by sampling from the noise distribution.

In [7]:
with torch.no_grad():
    X_tensor = torch.tensor(X, dtype=torch.float32)
    x_hat, mu, logvar = vae.forward(X_tensor)

x_hat, mu = x_hat.numpy(), mu.numpy()

fig, axes = plt.subplots(1, 3, figsize=(16, 5))
axes[0].scatter(X[:, 0], X[:, 1], c='blue', alpha=0.6)
axes[0].set_title('Original Moons Dataset')
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].grid(True, alpha=0.3)

axes[1].scatter(x_hat[:, 0], x_hat[:, 1], c='red', alpha=0.6)
axes[1].set_title('Reconstructed Data')
axes[1].set_xlabel('Feature 1')
axes[1].set_ylabel('Feature 2')
axes[1].grid(True, alpha=0.3)

# Latent space representation
axes[2].scatter(mu[:, 0], mu[:, 1], c='green', alpha=0.6)
axes[2].set_title('Latent Space (μ)')
axes[2].set_xlabel('Latent Dimension 1')
axes[2].set_ylabel('Latent Dimension 2')
axes[2].grid(True, alpha=0.3)

(d) Sampling data from noise:

In [8]:
@torch.no_grad()
def vae_sampling(n_samples=2000):
    z = torch.randn(n_samples, 2).to(device)
    return vae.decode(z)

samples = vae_sampling()
plot_moons(samples, title="Sampled data from noise")
In [ ]: