{ "cells": [ { "cell_type": "markdown", "id": "b084f2a2", "metadata": {}, "source": [ "# FNLP Lab 2: Char-RNN for Shakespeare\n", "\n", "This lab will guide you through making a basic character-level RNN for generating shakespearean text in Pytorch.\n", "\n", "You will learn how to implement an RNN from scratch, and how to use Pytorch optimizers and learning rate schedulers in a custom training loop in order to train this RNN on a text dataset. You will evalulate performance with perplexity and loss. Finally, you will briefly explore sampling methods to generate novel text from this RNN. \n", "\n", "We heavily recommend using Google Colab for this lab." ] }, { "cell_type": "markdown", "id": "f289af37", "metadata": {}, "source": [ "## Setup Dependencies\n", "\n", "Note: if you're not using colab you may need to run this command from your terminal (while in the correct environment)\n", "\n", "e.g. `conda activate fnlp; pip install -r requirements.txt`" ] }, { "cell_type": "code", "execution_count": 1, "id": "81391936", "metadata": {}, "outputs": [], "source": [ "# ! pip install -r requirements.txt # uncomment for google colab" ] }, { "cell_type": "code", "execution_count": 2, "id": "a88f02ad", "metadata": {}, "outputs": [], "source": [ "device='cpu'" ] }, { "cell_type": "code", "execution_count": 3, "id": "ef6ea99d", "metadata": {}, "outputs": [], "source": [ "import torch\n", "from torch import nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from tqdm import tqdm\n", "import random" ] }, { "cell_type": "code", "execution_count": 4, "id": "0c142763", "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt" ] }, { "cell_type": "markdown", "id": "6e86489e", "metadata": {}, "source": [ "### Read In Text\n", "\n", "You don't need to change anything in this section - we simply read in the shakespeare text." ] }, { "cell_type": "code", "execution_count": 5, "id": "ae145bd2", "metadata": {}, "outputs": [], "source": [ "with open('small_shakespeare_input.txt', 'r') as f:\n", " shakespeare = f.read()" ] }, { "cell_type": "code", "execution_count": 6, "id": "78b1de8e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "100000" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(shakespeare)" ] }, { "cell_type": "code", "execution_count": 7, "id": "cc431184", "metadata": {}, "outputs": [], "source": [ "alphabet = set(shakespeare)" ] }, { "cell_type": "code", "execution_count": 8, "id": "109c5298", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "62" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "alphabet = list(alphabet)\n", "len(alphabet)" ] }, { "cell_type": "code", "execution_count": 9, "id": "745671e4", "metadata": {}, "outputs": [], "source": [ "# create mapping from character <-> index (think of this a naive tokenizer, where each token is one character)\n", "char_to_index = {a:i for i, a in enumerate(alphabet)}\n", "index_to_char = {i:a for i, a in enumerate(alphabet)}" ] }, { "cell_type": "code", "execution_count": 10, "id": "b7a61031", "metadata": {}, "outputs": [], "source": [ "dev_set = shakespeare[:10000] # this is a nice splitting point where the first chunk looks reasonable\n", "train_set = shakespeare[10000:]" ] }, { "cell_type": "code", "execution_count": 11, "id": "d365100e", "metadata": {}, "outputs": [], "source": [ "def get_chunks(dataset, snippet_size=150):\n", " # convert dataset into chunks of text\n", " # each snippet will be a little over snippet_size\n", " \n", " cur_index = 0\n", " cur_text = []\n", " sections = dataset.split(\"\\n\\n\")\n", " sections = sections[1:]\n", " chunks = []\n", " while cur_index < len(sections):\n", " while len(\"\\n\\n\".join(cur_text)) < snippet_size and cur_index < len(sections):\n", " cur_text.append(sections[cur_index])\n", " cur_index += 1\n", " chunks.append(\"\\n\\n\".join(cur_text).strip())\n", " cur_text = []\n", " return chunks" ] }, { "cell_type": "code", "execution_count": 12, "id": "e62eb067", "metadata": {}, "outputs": [], "source": [ "def chunk_to_target_tensor(chunk):\n", " return torch.LongTensor([char_to_index[c] for c in chunk[1:]])" ] }, { "cell_type": "code", "execution_count": 13, "id": "d0202e1c", "metadata": {}, "outputs": [], "source": [ "def chunk_to_input_tensor(chunk):\n", " return torch.cat([char_to_tensor(c) for c in chunk[:-1]])" ] }, { "cell_type": "code", "execution_count": 14, "id": "e7f82bfb", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\"Servant:\\nWhat, think you then the king shall be deposed?\\n\\nGardener:\\nDepress'd he is already, and deposed\\n'Tis doubt he will be: letters came last night\\nTo a dear friend of the good Duke of York's,\\nThat tell black tidings.\"" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "get_chunks(dev_set)[0]" ] }, { "cell_type": "code", "execution_count": 15, "id": "cc77e440", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'DUKE OF YORK:\\nTo do that office of thine own good will\\nWhich tired majesty did make thee offer,\\nThe resignation of thy state and crown\\nTo Henry Bolingbroke.'" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "get_chunks(train_set)[0]" ] }, { "cell_type": "markdown", "id": "08f8a30b", "metadata": {}, "source": [ "# Implement a Vanilla RNN from Scratch\n", "\n", "\n", "### What are RNNs?\n", "\n", "Recurrent Neural Networks (RNNs) are a class of neural networks useful for sequential data. Unlike traditional feedforward networks, RNNs maintain a 'hidden state' that carries information across sequence steps, allowing them to 'remember' information from prior inputs. \n", "\n", "At each time step (in your case, at each character):\n", "- The model combines the current input (e.g. the character 'h') with the hidden state from the previous step (a weight matrix).\n", "- This hidden state acts as a \"memory\" of the sequence\n", "- The model predicts raw logits ($y_t$) that are converted into a prediction of the next state (in your case, the next character)\n", "\n", "The mathematical representation for the RNN you will implement is the following:\n", "\n", "At time step $t$:\n", "\n", "$$\n", "(1) \\ \\ \\ \\ \\ \\ \\ h_t = g_1(W_{ih}x_t + W_{hh}h_{t-1} + b_h)\n", "$$\n", "$$\n", "(2) \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ y_t = g_2(W_{ho}h_t + b_o)\n", "$$\n", "\n", "where:\n", "- $ h_t $: hidden state at time $ t $\n", "- $ x_t $: input at time $ t $\n", "- $ W_{ih}, W_{hh}, W_{ho} $: weight matrices\n", "- $ b_h, b_o $: biases\n", " - $ g_1/g_2 $: activation functions (e.g., $ \\tanh $ or ReLU), you will use $g_1 = \\tanh$ and no $g_2$)\n", " \n", "To convert your output $y_t$ into the input token for the next step ($x_{t+1}$) you will implement a basic sampling procedure. It's important to note that the output of your model, $y_t$, are the raw logits (unnormalized scores) and not valid inputs to your model. There are more details below.\n", " \n", "Implementation notes: you will use Softmax to convert the outputs of your model into a probability distribution, but this should **not** be done _inside_ your model as the sampling procedure will be done on your raw logits.\n", "\n", "Your RNN will return both $h_t$ (to feed to future steps) and $y_t$ (as your prediction of the next token). \n", "\n", "### How is our data represented?\n", "\n", "Unlike Assignment 1, you will not need anything beyond basic tokenization for these character-RNNs. Instead we will feed each character individually into your RNN.\n", "\n", "There are two common ways characters are often represented: \n", "\n", "1) as a one-hot vector (e.g. $[0, 1, 0, 0]$) where the index of the $1$ is the character-index of the input character\n", "2) as a character-id that will be converted into an embedding\n", "\n", "We will do the latter for this lab, as it will be a useful introduction before Assignment 2. This is a similar idea to the word-vectors from assignment 1, but instead of having a separate training procedure (word2vec) we will learn the embeddings as we train. Pytorch provides a nice [Embedding](https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html) class for this puprpose.\n", "\n", "In practice $x_t$ (the input to your RNN layer) will be represented by a tensor with dimension $(\\text{batch_size}, |\\text{embedding_size}|)$. Each element in the batch will be a vector representing a single character. The input to the original model (before the embedding layer) is a tensor with dimension $(\\text{batch_size})$, where each element is a character-id. We implement the overarching logic for you - you only need to concern yourself with the RNN and OutputLayer classes.\n", "\n", "The output from your OutputLayer, $y_t$, will be represented by a tensor with dimension $(\\text{batch_size}, |\\text{vocab}|)$. Each element in the output batch will be the raw logits provided by your model, **and not** valid character vectors.\n", "\n", "By applying the softmax function ($S$) to the logits we can consider $S(y_t)$ a distribution over the vocab, i.e. the model's predicted probability for each character. The maximum of this distribution is the most likely next character (according to the model). We will come back to this idea when it comes to sample from this distribution.\n", "\n", "\n", "This site may be a useful resource in addition to your lecture materials to get a sense for RNNs: https://stanford.edu/~shervine/teaching/cs-230/cheatsheet-recurrent-neural-networks" ] }, { "cell_type": "markdown", "id": "4b22059b", "metadata": {}, "source": [ "# Components of Your RNN\n", "\n", "**You will need to implement the following classes:**\n", "1. **RNN** (Equation 1):\n", "- Input: $x_t, h_{t-1}$; Output: $h_{t}$\n", "- Initialized with input_size ($x_t$ shape, the embedding size), hidden size (up to you, defines how much complexity the RNN can model)\n", "- Your implementation should have two linear layers (each linear layer performs $xA^T + b$)\n", "- Your forward method will implement the equations above\n", " \n", "One trick to be cognisant of is how to model $W_{ih}x_t + W_{hh}h_{t-1}$ - how might you do this with only one weight matrix, and in only one multiplication?\n", "\n", "2. **OutputLayer** (Equation 2):\n", "- Input: $h_{t}$; Output: $h_{t}, y_{t}$\n", "- Initialized with the hidden_size from your RNN and the output size (e.g. the vocab size)\n", "- This layer will take the output of your RNN and convert it (via learned weights) into the space of your vocabulary\n", "- **do not normalize your output logits yet**, this normalization is usually done outside of your model.\n", "\n", "## TODO: Implement `RNN()` and `OutputLayer()`" ] }, { "cell_type": "code", "execution_count": 16, "id": "c43c76ae", "metadata": {}, "outputs": [], "source": [ "class RNN(nn.Module):\n", " def __init__(self, input_size, hidden_size):\n", " super(RNN, self).__init__()\n", " \n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " \n", " # Linear transformation to go from input+hidden to hidden\n", " self.i2h = nn.Linear(input_size + hidden_size, hidden_size)\n", " \n", " def forward(self, X, H):\n", " # X is the current input, H is the last hidden state\n", " # X has shape (batch_size, input_size)\n", " # H has shape (batch_size, hidden_size)\n", " # Concatenate the input and previous hidden state\n", " \n", " concatenated_input = torch.cat((X, H), -1) # Shape: (batch_size, input_size + hidden_size)\n", " \n", " # Calculate the new hidden state\n", " H_new = torch.tanh(self.i2h(concatenated_input)) # Non-linear activation\n", " \n", " return H_new\n", "\n", "# Output layer to convert hidden state to output logits of dimension |vocab size|\n", "class OutputLayer(nn.Module):\n", " def __init__(self, hidden_size, output_size):\n", " super(OutputLayer, self).__init__()\n", " \n", " self.h2o = nn.Linear(hidden_size, output_size)\n", "\n", " def forward(self, hidden_state):\n", " # Apply the output layer to the hidden state to get logits\n", " output = self.h2o(hidden_state)\n", " \n", " return output\n", " \n", "# Wrapper class that combines both RNN and OutputLayer\n", "class RNNModel(nn.Module):\n", " def __init__(self, input_size, hidden_size, embedding_size=124):\n", " super(RNNModel, self).__init__()\n", " \n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " # Initialize the RNN and the OutputLayer\n", " self.emb = nn.Embedding(input_size, embedding_size)\n", " self.rnn = RNN(embedding_size, hidden_size)\n", " self.output_layer = OutputLayer(hidden_size, input_size)\n", "\n", " def forward(self, X, H):\n", " # Pass input through the RNN to get the new hidden state\n", " X_emb = self.emb(X)\n", "# H_new = self.rnn(X, H)\n", " H_new = self.rnn(X_emb, H)\n", " \n", " # Get the logits from the output layer\n", " output_logits = self.output_layer(H_new)\n", " \n", " return output_logits, H_new\n", " \n", " def init_hidden(self, batch_size=1):\n", " # Return initial hidden state - we initialize this to zero for consistency\n", " return torch.zeros(batch_size, self.hidden_size)" ] }, { "cell_type": "code", "execution_count": 17, "id": "32714875", "metadata": {}, "outputs": [], "source": [ "# Helper function to convert characters to tensor indices\n", "def char_to_tensor(char):\n", " return torch.LongTensor([char_to_index[char]])" ] }, { "cell_type": "markdown", "id": "2ee77dfe", "metadata": {}, "source": [ "### Sanity Check for Tensor Sizes" ] }, { "cell_type": "code", "execution_count": 18, "id": "3b5a333b", "metadata": {}, "outputs": [], "source": [ "vocab_size = len(char_to_index)" ] }, { "cell_type": "code", "execution_count": 19, "id": "2cf4484e", "metadata": {}, "outputs": [], "source": [ "rnn = RNN(vocab_size, 256)\n", "output_layer = OutputLayer(256, vocab_size)\n", "rnnmodel = RNNModel(vocab_size, 256)" ] }, { "cell_type": "code", "execution_count": 20, "id": "2a5fcd07", "metadata": {}, "outputs": [], "source": [ "X = torch.rand((5, vocab_size))\n", "h = torch.rand((5, 256))" ] }, { "cell_type": "code", "execution_count": 21, "id": "b719eb7b", "metadata": {}, "outputs": [], "source": [ "h_new = rnn(X, h)\n", "assert h_new.shape == (5, 256)" ] }, { "cell_type": "code", "execution_count": 22, "id": "12a8e425", "metadata": {}, "outputs": [], "source": [ "output = output_layer(h)\n", "assert output.shape == (5, vocab_size)" ] }, { "cell_type": "code", "execution_count": 23, "id": "2522e194", "metadata": {}, "outputs": [], "source": [ "X = chunk_to_input_tensor('abcd') # note this method ignores the last\n", "h = rnnmodel.init_hidden(batch_size=3)\n", "output_logits, h_new = rnnmodel(X, h)\n", "assert h_new.shape == (3, 256)\n", "assert output_logits.shape == (3, vocab_size)" ] }, { "cell_type": "markdown", "id": "39eb5063", "metadata": {}, "source": [ "# Measuring Performance: Perplexity\n", "\n", "In addition to loss, it can be useful to measure a model's _perplexity_ during training. In this section you will implement the perplexity metric for a given model across a dataset.\n", "\n", "### PPL Background (can skip, or read Jurafsky Chapter 3.3)\n", "\n", "_Perplexity_ is a commonly used metric for how well a model predicts a given dataset. In the context of language models, we use perplexity to see if the model is good at predicting the text in our dataset.\n", "\n", "More concretely, we want a model that is not 'surprised' by the tokens in a given text. Remember your model will output a probability distribution over your vocab. Iterating over your text, a perfect model would assign a probability of 1 to the correct tokens ('correct' means the ones in your dataset) and 0 to all others. We want this metric to be normalized by length so we could compare the surprise on different datasets. Remember, **lower perplexity is better**\n", "\n", "We define the probability of producing a given sequence as:\n", "\n", "$$\n", "P(s) = \\prod_{i=1}^{N} P(s_i | s_{