# Deep learning lab 2: Transformers and self-attention

The goal of this lab is that you get some hands on experience with the transformer architecture. In the first part of the lab we will work with the self-attention mechanism and implement it in pytorch. For this part the content from the first Lecture on Transformers is sufficient. In the second part we will train two different transformer architectures with different tasks. In the outro session of the lab you are asked to present the solution to the questions of the lab to the teachers.

In [None]:
import numpy as np
import torch

An important part of the Transformer architecture is the fully-connected neural network which we have seen in Assignment and Lab 1. Typically, a neural network with one hidden layer is used for this with the hidden dimension being 4 times larger than the input/output dimension. We use the GeLU non-linear activation function, which is a smoothed out version of the ReLU function you have seen in the previous sessions, for the non-linearity as it is the default choice for Transformer layers. By now, you should be familiar with this general architecture and understand the implementation below.

In [None]:
class MLP(torch.nn.Module):
    def __init__(self, input_dimension):
        super().__init__()
        self.c_fc = torch.nn.Linear(input_dimension, 4 * input_dimension)
        self.gelu = torch.nn.GELU(approximate='tanh')
        self.c_proj = torch.nn.Linear(4 * input_dimension, input_dimension)

    def forward(self, x):
        x = self.c_fc(x)
        x = self.gelu(x)
        x = self.c_proj(x)
        return x

## Part 1: Self-attention mechanism

The self-attention mechanism in matrix form is given by the following equations. For more details you can check Section 12.2.4 in the course book.

$$
\mathbf{Sa}[\mathbf{X}] = \mathbf{V}[\mathbf{X}] \cdot \mathbf{Softmax} \left[ \mathbf{K}[\mathbf{X}]^T \mathbf{Q}[\mathbf{X}] \right]
$$

where $\mathbf{Q[X]}$, $\mathbf{K[X]}$, $\mathbf{V[X]}$ are the Key, Query and Value matrices given by linear transformations of the input $x$ to the layer:

\begin{align}
\mathbf{Q}[\mathbf{X}] = \mathbf{\beta_q 1^T + \Omega_q X} \\
\mathbf{K}[\mathbf{X}] = \mathbf{\beta_k 1^T + \Omega_k X} \\
\mathbf{V}[\mathbf{X}] = \mathbf{\beta_v 1^T + \Omega_v X} 
\end{align}

Similar to the book, we do not include the dependence of $\mathbf{Q}$, $\mathbf{K}$, $\mathbf{V}$ on the input $\mathbf{X}$ in the following. Note in particular that the input and output dimensions are the same for the layer.

TransformerBlockSA.svg

When implementing the architecture in pytorch, we rely on the transposed version of the self-attention since we additionally have a batch dimension. The input itself is usually of dimension $(B, N, D)$ where $B$ is the batch size, $N$ is the number of tokens / elements in the sequence and $D$ is the dimensionality of the input also refred to as input embedding.
In its transposed form, the self-attention mechanism can be written as:

$$
\mathbf{Sa}[\mathcal{X}] = \mathbf{Softmax} \left[\mathcal{Q}[\mathcal{X}]  \mathcal{K}[\mathcal{X}]^T \right] \cdot \mathcal{V}[\mathcal{X}]
$$

which is the transposed version of the equation shown above where $\mathcal{X}$ is the input tensor of size $N \times D$. The softmax operation above should be applied row-wise in this case. For an efficient implementation, use the transposed formulation above when implementing the modules in the lab.
This forumlation also allows us to easily parallelize over multiple batches by including the batch dimension as the first dimension of the input tensor. PyTorch parallelizes over the leading dimensions the standard operations using broadcasting.

**Task 1**: In the init method of the single-head self attention, create the parameterized layers that are required to compute the output of the mechanism described above.

**Task 2**: Write the forward pass of the module to implement the self-attention mechanism. Return both the output tokens as well as the attention matrix itself as a second return argument.

_Hints_:
- Learned matrix-vector products can be efficiently implemented using linear layers using torch.nn.Linear.
- When using a transformer, the input usually is a tensor of size (batch_size, num_tokens, embedding_size) = $(B, N, D)$ so be careful when transposing tensors. Using the function .transpose() you can pass the dimensions you want to transpose as an input argument.

In [None]:
class SingleHeadSelfAttention(torch.nn.Module):
    
    def __init__(self, embedding_dimension):
        super().__init__()
        # TODO: implement the key, query and value operators using torch.nn.Linear
    
    def forward(self, x):
        # TODO: implement forward pass here
        
        # return the new tokens and the attention matrix
        return x, None


In [None]:
# Create a single head self-attention layer with embedding size 32
sa = SingleHeadSelfAttention(32)

# create a normally distributed vectors
# vector x is of size (BATCHSIZE, NUM_TOKENS, EMBEDDING_SIZE) = (B, N, D)
x = torch.empty((2, 10, 32)).normal_()
out, att = sa(x)

**Question 1**: What size is the attention matrix that gets returned by the single-head layer? What constraints must the attention matrix and its elements fulfill in terms of the attention matrix size and the values that the elements in the matrix take?

_Hint_: you can look at the following easy checks that verify the constraints of the computed attention matrices.

In [None]:
# the attention matrix should be of size (B, N, N)
att.shape

In [None]:
# all elements in the attention matrix should be non-negative
torch.where(att < 0)

In [None]:
# the rows in the attention matrix should sum to one
torch.sum(att, dim=-1)

Now, we have managed to implement the heart of the transformer -- the self-attention mechamism. In the following, we will extend the architecture to include normalization, multiple attention heads, and passing the results through non-linear MLPs.

In the cell below, you can see a visualization of the attention matrix of the initalized transformer.

**Question 2**: What can you observe about the distribution of attention scores? Why is this problem arising and how can we avoid this?

In [None]:
import matplotlib.pyplot as plt

plt.imshow(att[0].detach().numpy())
plt.show()

**Task 3**: This scaling problem arises from the fact that the softmax function can quickly focus on a single maximum value, causing all other components to become very small. In order to fix the scaling of the attention matrix and make training easier, add the scaling constant given in the book in equation (12.9) to the forward pass of the single-head self-attention layer. How does the attention map look after normalizing the attention step?

**Task 4**: In practice, using the single-head self-attention is usually not sufficient to get good performance of the model. Instead, we implement a **multi-head self-attention** next. Therefore, you need to add an additional hyper-parameter to the module which allows us to specify the number of attention heads we want to use.
Implement the multi-head attention in the code cell below:

_Hints_:
- Note that you can use the same parameterization for the multi-head attention as for the single head case in the init function for the query, key and value transformations.
- You need to add an additional linear transformation to combine the results from the different heads after the self-attention steps.
- After computing the key, queries, and values you should reshape the results in a 4d tensor of size $(B, N, H, D / H)$ where $H$ is the number of heads in your self-attention model.
- Since we want to parallelize the computations over the batch dimension and the number of heads, we want to reshape the tensor such that these dimensions are in the front dimensions. Don't forget to reshuffle again before you bring the output in the final form which is the same as the input itself.

TransformerBlockSAMultiHead.svg


In [None]:
class MultiHeadSelfAttention(torch.nn.Module):
    
    def __init__(self, num_heads, embedding_dimension):
        
        super().__init__()
        assert embedding_dimension % num_heads == 0
        self.num_heads = num_heads

        # TODO: add key, query and value transformations using torch.nn.Linear
        # TODO: add the final projection transformation
    def forward(self, x):
        # (batch_size, num_tokens, embedding_dimension)
        B, N, D = x.size()
        att = None

        # TODO: compute the key, queries and values for the input then reshape the result and compute the scaled self-attention mechanism
        return x, att


In [None]:
sa = MultiHeadSelfAttention(4, 32)

# we create a normally distributed vectors
# vector x is of size (BATCHSIZE, NUM_TOKENS, EMBEDDING_DIMENSION) = (B, N, D)
x = torch.empty((2, 10, 32)).normal_()
out, att = sa(x)

In [None]:
# check the output dimension, expected: (B, N, D)
out.shape

In [None]:
# check the shape of the attention matrix, expected: (B, H, N, N)
att.shape

Next, we want to combine the just implemented self-attention block with additional non-linearities in order to get a Transformer Block which consists of layer norms, self-attention and a multi-layer perceptron as shown in the Figure below:

TransformerBlock.svg

Note that the batch dimension is not shown in this figure.

**Task 5**: Implement the forward pass in the transformer encoder block in the function below. Note that the just implemented attention mechanism returns two arguments the first one being the output of the transformer and the second one the attention matrix that we keep for visualization purposes.

In [None]:
class EncoderBlock(torch.nn.Module):
    def __init__(self, num_heads, embedding_dimension):
        super().__init__()
        self.ln1 = torch.nn.LayerNorm(embedding_dimension)
        self.attn = MultiHeadSelfAttention(num_heads, embedding_dimension)
        self.ln2 = torch.nn.LayerNorm(embedding_dimension)
        self.mlp = MLP(embedding_dimension)

    def forward(self, x):
        att_w = None
        # TODO: implement the forward pass and return the output and the attention matrix
        return x, att_w


In [None]:
encoder = EncoderBlock(2, 32)

x = torch.empty((2, 10, 32)).normal_()
out, att = encoder(x)

# the output should have the same shape as the input x
print(out.shape)

**Question 3**: How many parameters does each part of the transformer block have for an embedding dimension of 100?

## Part 2a: Sequence-to-sequence task - reversing a list

We start with a basic example of a sequence-to-sequence task.
Similar problems arise for example in machine translation and text summarization.
However, here we consider an even simpler problem of reversing an input sequence.

The input consists of $N$ numbers between 0 and $M-1$, each serving as an input token to the Transformer. We can simply represent each of the inputs as a one-hot encoding of the number using a $M$-dimensional vector. Even though this sounds like a very simple task, RNNs and other architectures can have issues with this due to the long range dependencies.

In [None]:
import torch.utils.data as data
from functools import partial

class ReverseDataset(data.Dataset):

    def __init__(self, num_categories, seq_len, size):
        super().__init__()
        self.num_categories = num_categories
        self.seq_len = seq_len
        self.size = size

        self.data = torch.randint(self.num_categories, size=(self.size, self.seq_len))

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        inp_data = self.data[idx]
        labels = torch.flip(inp_data, dims=(0,))
        return inp_data, labels

# hyper-parameters for the dataset
num_classes = 10
sequence_length = 16

# creating a dataset
dataset = partial(ReverseDataset, num_classes, sequence_length)
train_loader = data.DataLoader(dataset(50_000), batch_size=128, shuffle=True, drop_last=True, pin_memory=True)
val_loader   = data.DataLoader(dataset(1_000), batch_size=128)
test_loader  = data.DataLoader(dataset(10_000), batch_size=128)

Here is an example sequence from the dataset with the correctly reversed labels.

In [None]:
inp_data, labels = train_loader.dataset[0]
print("Input data:", inp_data)
print("Labels:    ", labels)

Next, we create a encoder architecture to reverse the list which simply consists of several encoder blocks implemented in the previous section.

**Question 4**: Why are we using an encoder here instead of a decoder or an encoder-decoder architecture?

**Task 6**: In addition to the transformer blocks, we need a predictive network that transforms the network output into the predictions. Implement a neural network with one hidden layer with the given hidden dimesion that takes the output from the last encoder block and predicts the token at each position. You can implement this using a sequential list of modules as started in the code below. Aditionally, we need to add a positional encoding for the network to know the position of the elements in the sequence. This is already implemented in the code below.

In [None]:
class TransformerEncoder(torch.nn.Module):

    def __init__(self, num_layers, sequence_length, num_heads, embedding_dimension):
        super().__init__()
        
        self.layers = torch.nn.ModuleList([EncoderBlock(num_heads, embedding_dimension) for _ in range(num_layers)])
        
        hidden_dimension = 100
        self.predictor = torch.nn.Sequential(
            # TODO: write a non-linear prediction network here
        )

        # positional encoding        
        self.pe = torch.nn.Embedding(sequence_length, embedding_dimension)        
    def forward(self, x):
        atts = []
        
        # Number of tokens
        N = x.shape[1] 

        # positional encodings
        pos_encodings = self.pe(torch.arange(N))
        x = x + pos_encodings
        
        # forward pass
        for l in self.layers:
            x, w = l(x)
            atts.append(w.detach())

        x = self.predictor(x)
        return x, atts


In the function below, we compute the loss function for the task. We use a one-hot encoding for the different numbers in the list and compute the cross entropy loss to classiy the predictions at each position of the reversed list correctly.

In [None]:
def compute_loss(model, batch):
    inp_data, labels = batch
    # forward pass and loss function
    inp_data = torch.nn.functional.one_hot(inp_data, num_classes=num_classes).float()
    output, _ = model(inp_data)
    loss = torch.nn.functional.cross_entropy(output.view(-1, output.size(-1)), labels.view(-1))
    
    acc = (output.argmax(dim=-1) == labels).float().mean()
    return loss, acc

Next, we use the loss function to train our model. This training loop follows the standard mechanism you should be familar with at this stage from the previous assignments and labs. Training this model should only take a few minutes.

In [None]:
# using a one layer with two heads
encoder_model = TransformerEncoder(num_layers=1, num_heads=2, embedding_dimension=num_classes, sequence_length=sequence_length)
optimizer = torch.optim.AdamW(encoder_model.parameters())
num_epochs = 10

losses = []
accs = []

# loop over the epochs and training data
for e in range(num_epochs):
    encoder_model.train()
    
    for b in train_loader:
        optimizer.zero_grad()
        loss, acc = compute_loss(encoder_model, b)
        loss.backward()
        optimizer.step()
        
        # logging
        losses.append(loss.item())
        accs.append(acc)

Below, we can see the loss function and the accuracy for the training performance of the network.

Your final model should reach a performance of 100% on the test data as the considered task is quite easy. You might need to increase the number of heads to get a good performance.

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))

axes[0].set_title("Training loss")
axes[0].plot(losses, alpha=0.5)
axes[0].grid()

axes[1].set_title("Training accuracy")
axes[1].plot(accs, alpha=0.5)
axes[1].grid()

plt.show()

**Question 5**: What happens when removing the positional encoding from the encoder network above with the performance of the network?

In [None]:
# test the model on the unseen test data
test_accs = []
for b in test_loader:
    loss, acc = compute_loss(encoder_model, b)
    test_accs.append(acc)

print("Test set performance", np.mean(test_accs))

In [None]:
# test sample...
test = torch.Tensor([[1, 2, 3, 4, 5, 6, 6, 6, 7, 1, 2, 8, 9, 5, 3, 9]]).type(torch.LongTensor)
print(test)
test = torch.nn.functional.one_hot(test, num_classes=num_classes).float()

output, att = encoder_model(test)

print(torch.argmax(output.squeeze(), dim=1))

In the cell below, you can see a visualization of the two attention matrices of the transformer encoder.

**Question 6**: Can you intepret the pattern in these attention matrices?

In [None]:
# visualize the attention maps
att_maps = att[0]

plt.imshow(att[0][0, 0].detach().numpy())
plt.show()

plt.imshow(att[0][0, 1].detach().numpy())
plt.show()

## Part 2b: Sequence generation task

In the second example, we train a decoder model to generate text. In this part of the lab we are using the tiny sheakspear dataset. It is a small dataset containing the plays by Shakespear. First, we need to obtain the data and read it in. We use every single character as an individual token. Therefore, we first find out the number of distinct characters in all the plays and then create a mapping from each character to a unique number.

For this task it is also nice to use a GPU e.g. on google colab to get a faster training. However, the model can also be trained locally on a CPU and the training time should still be relativly fast.

In [None]:
device = "cpu" # TODO: you can change the device to cuda if you are using colab or a local graphics card.

Let's start by loading the data. Quickly scan the code below and verify that the functions implement the described behaviour above.

In [None]:
# load the dataset
! wget https://raw.githubusercontent.com/uu-sml/course-dl-public/refs/heads/main/Lab2/input.txt

In [None]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [None]:
# let's take a quick look at the text :)
text[:100]

In [None]:
# we do character level encoding here:
chars = sorted(list(set(text)))
vocab_size = len(chars)
print(f"Using vocab size {vocab_size}")

In [None]:
# Create a mapping from characters to integers and back.
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # Encoder: Take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # Decoder: take a list of integers, output a string

Now we can get started to implement a decoder model that we can then use for autoregressive sampling to generate new text sequences.

To accomplish this, we first have to adjust the self attention module from above to use causal self attention i.e. only attend to tokens that are ahead in the sequence to them to allow us to efficiently train the model.

To get started on this task, copy the code for the multi-head attention implementation in the cell blow. When computing the self-attention scores, we now want to make sure that each token only attends to tokens that come before the token itself in the sequence.
This can be achieved by applying a triangular masking to the attention weights.
Remember to apply the triangular masking on the unnormalized attention weights before applying the softmax function in order to make sure the columns normalize to one in the attention matrix.

**Task 7**: Implement these changes by adding the masking of the self attention matrix in the forward pass in the new class below that we use for causal self-attention in the decoder that we impelment in the next step.

In [None]:
class MultiHeadCausalSelfAttention(torch.nn.Module):
    # TODO: copy the code from the multihead self-attention here to get started.


A decoder block and Transformer decoder are very similar to the encoder block and the architecture but utilizes causal self-attention instead. However, we need to make some small adjustments to the task in order to get a good performance:
- Instead of using a one-hot encodings for the tokens here, we learn an embedding for each of the 65 tokens to a vector of the embedding dimension that we can choose as a hyper-parameter. For both the positional encoding and the token embedding you should now have a torch.nn.Embedding module.
- The final predictive networks also needs adjustment where we want to increase the hidden dimension and choose the correct output dimension. For the hidden dimension you can choose for example 4 times the embedding dimension of the network. The output dimension is equal to the number of avialble classes.

**Task 8**: Implement the changes for the decoder architecture below, for this you only need to swap out the full attention in the encoder block with the masked attention module.

In [None]:
class DecoderBlock(torch.nn.Module):
    # TODO: to get started, use the previously implemeted encoder block.

class TransformerDecoder(torch.nn.Module):
    # TODO: to get started, copy the previously implemented encoder transformer.


Now, we encode the loaded text data into numerical values and split it into training and test sets. We also implement a function to generate batches from the text data. In decoder models, the task is to predict the next token (in this case, a character) for each input. Therefore, the batch function returns the input shifted one position to the right as the target output.

In [None]:
# Encode the data to numerical values
data = torch.tensor(encode(text), dtype=torch.long)

# First 90% will be train, rest val
n = int(0.9*len(data))
train_data = data[:n]
val_data = data[n:]

# hyper-parameters
block_size = 128
batch_size = 64

# get a batch of data
def get_batch(split):
    # Generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

After the model is trained, we want to sample from it in an autoregressive manner. That means we input a sequence of tokens into the model and compute the probability distribution over the next possible token by the network output. Then we append the newly generated token to the input and repeat this sampling process for a fixed number of steps.

**Task 9**: Implement the sampling in the method below. As the next token, choose the output that is predicted to be most likely by the network.

_Feel free to change the default prompt to something more exciting._

In [None]:
def sample(model, sample_length=100, prompt="The"):
    # transform the prompt to a list of indices
    prompt = torch.tensor(encode(prompt), dtype=torch.long).reshape(1, -1).to(device)
    
    for _ in range(sample_length):
        next_token = None
        # TODO: sample the next token predicted from the decoder network.
        
        # add the new sample to the prompt:
        prompt = torch.cat((prompt, next_token.reshape(1, -1)), dim=1)

    # decode the generated sequence and print it
    t = decode(prompt.squeeze().tolist())
    print(t)


We can check if the sampling function works in principal by creating a model and sampling from the output. Since the model is not trained yet, the output will be just a collection of random letters here.

In [None]:
# sequence length is the maximum length an input sequence can have.
transformer = TransformerDecoder(num_layers=3, num_heads=4, embedding_dimension=128, sequence_length=256)
transformer.to(device)

# check if the model forward pass and sampling
sample(transformer, sample_length=20)

Now, we can start training our model using the code below.

_Note_: You can also change the network architecture but bigger models are slower to train and require more compute. Using the standard architecture outlined here, the training takes around 20 minutes on my laptop without any GPU access. You should see a significant improvement over the random sampling in the model but the output might not make a lot of sense due to the small scale of the computations used here. Training the model longer is not required to complete the lab.

In [None]:
optimizer = torch.optim.AdamW(transformer.parameters())

max_iter = 5_000
losses = []

eval_freq = 100

for i in range(max_iter):

    # get the training datta
    xb, yb = get_batch('train')
    
    # compute the predictions
    pred = transformer(xb)

    # reshape the output and target and compute the loss
    B, N, D = pred.shape
    logits = pred.view(B*N, D)
    targets = yb.view(B*N)
    
    loss = torch.nn.functional.cross_entropy(logits, targets)
    losses.append(loss.detach().item())

    # compute the backward pass and update parameters
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    # do evaluation
    if i % eval_freq == 0:
        print(f"Iteration: {i}\t loss: {loss.item():.2f}")
        sample(transformer)
        print("----")

Next, we can visualize the loss over the iterations

In [None]:
plt.plot(losses)
plt.grid()

Now that our model is trained, you can again draw samples from the model. You can also change the initial prompt to get different starting points for the sample drawing.

In [None]:
# draw samples from the trained model
prompt = "And..."
sample(transformer, prompt=prompt)

__Question 7__: Using the current sampling strategy, the network output is deterministic. How could you change the sampling such that the network can produce multiple output sequences for a single input? (_You do not have to implement these changes_)

Note that we use a very small language model here. If you want to get some better results you would need a larger model which requires more training. However, the implementation you have done here basically includes all the components that the GPT and similar model families include. By simply changing the hyper-parameters you can reimplement GPT-2 in this lab (which is the last model in the GPT architecture where the model is well described in a research paper).

More modern models add a lot of computational tricks in order to make the model more scalable and so on but the underlying principles are the same as the ones you have implemented yourself in this lab.

## Summary & Outlook

In this lab you have seen how to implement a transformer using a single head or multiple heads either using a decoder style model for sequence-to-sequence tasks and a small example for a decoder model. The code you have written includes everything the basic GPT models consist of.

Hopefully, this lab has given you some hands on experience with the transformer architecture to understand what is going on under the hood of these models that run here.

The examples shown here are toy examples that show the potential power of transformers but are in themselves fairly easy. However, transformers nowadays are capable of remarkable performances. While this course covers the basics for the transformer architecture, you will see similar models again in follow up courses:

- _Large Language Models and Societal Consequences of Artificial Intelligence_ where the focus will be on building an LLM application, leveraging transformers for text and language, and exploring their impact. Through the assignments, you will work with state-of-the-art LLMs.
- _Advanced Deep Learning for Image Processing_ is a course focusing on deep learning models for images and also covers the suprising competetivness of transformers on image data called vision transformers (ViT).