|
| 1 | +#!/usr/bin/env python3 |
| 2 | +import argparse |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +import torch |
| 6 | + |
| 7 | +import npfl138 |
| 8 | +npfl138.require_version("2425.5") |
| 9 | +from npfl138.datasets.mnist import MNIST |
| 10 | + |
| 11 | +parser = argparse.ArgumentParser() |
| 12 | +# These arguments will be set appropriately by ReCodEx, even if you change them. |
| 13 | +parser.add_argument("--batch_size", default=50, type=int, help="Batch size.") |
| 14 | +parser.add_argument("--cnn", default="5-3-2,10-3-2", type=str, help="CNN architecture.") |
| 15 | +parser.add_argument("--epochs", default=5, type=int, help="Number of epochs.") |
| 16 | +parser.add_argument("--learning_rate", default=0.1, type=float, help="Learning rate.") |
| 17 | +parser.add_argument("--recodex", default=False, action="store_true", help="Evaluation in ReCodEx.") |
| 18 | +parser.add_argument("--seed", default=42, type=int, help="Random seed.") |
| 19 | +parser.add_argument("--threads", default=1, type=int, help="Maximum number of threads to use.") |
| 20 | +parser.add_argument("--verify", default=False, action="store_true", help="Verify the implementation.") |
| 21 | +# If you add more arguments, ReCodEx will keep them with your default values. |
| 22 | + |
| 23 | + |
| 24 | +class Convolution: |
| 25 | + def __init__( |
| 26 | + self, filters: int, kernel_size: int, stride: int, input_shape: list[int], seed: int, verify: bool, |
| 27 | + ) -> None: |
| 28 | + # Create a convolutional layer with the given arguments and given input shape. |
| 29 | + # Note that we use NHWC format, so the MNIST images have shape [28, 28, 1]. |
| 30 | + self._filters = filters |
| 31 | + self._kernel_size = kernel_size |
| 32 | + self._stride = stride |
| 33 | + self._verify = verify |
| 34 | + |
| 35 | + # Here the kernel and bias variables are created, the kernel has shape |
| 36 | + # [kernel_height, kernel_width, input_channels, output_channels], bias [output_channels]. |
| 37 | + self._kernel = torch.nn.Parameter(torch.randn(kernel_size, kernel_size, input_shape[2], filters) * 0.1) |
| 38 | + self._bias = torch.nn.Parameter(torch.zeros(filters)) |
| 39 | + |
| 40 | + def forward(self, inputs: torch.Tensor) -> torch.Tensor: |
| 41 | + # TODO: Compute the forward propagation through the convolution |
| 42 | + # with ReLU activation, and return the result. |
| 43 | + # |
| 44 | + # In order for the computation to be reasonably fast, you cannot |
| 45 | + # manually iterate through the individual pixels, batch examples, |
| 46 | + # input filters, or output filters. However, you can manually |
| 47 | + # iterate through the kernel size. |
| 48 | + output = ... |
| 49 | + |
| 50 | + # If requested, verify that `output` contains a correct value. |
| 51 | + if self._verify: |
| 52 | + reference = torch.relu(torch.nn.functional.conv2d( |
| 53 | + inputs.movedim(-1, 1), self._kernel.permute(3, 2, 0, 1), self._bias, self._stride)).movedim(1, -1) |
| 54 | + np.testing.assert_allclose(output.detach().numpy(), reference.detach().numpy(), atol=1e-4, |
| 55 | + err_msg="Forward pass differs!") |
| 56 | + |
| 57 | + return output |
| 58 | + |
| 59 | + def backward( |
| 60 | + self, inputs: torch.Tensor, outputs: torch.Tensor, outputs_gradient: torch.Tensor |
| 61 | + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: |
| 62 | + # TODO: Given this layer's inputs, this layer's outputs, |
| 63 | + # and the gradient with respect to the layer's outputs, |
| 64 | + # compute the derivatives of the loss with respect to |
| 65 | + # - the `inputs` layer, |
| 66 | + # - `self._kernel`, |
| 67 | + # - `self._bias`. |
| 68 | + inputs_gradient, kernel_gradient, bias_gradient = ..., ..., ... |
| 69 | + |
| 70 | + # If requested, verify that the three computed gradients are correct. |
| 71 | + if self._verify: |
| 72 | + with torch.enable_grad(): |
| 73 | + inputs.requires_grad_(True) |
| 74 | + inputs.grad = self._kernel.grad = self._bias.grad = None |
| 75 | + reference = (outputs > 0) * torch.nn.functional.conv2d( |
| 76 | + inputs.movedim(-1, 1), self._kernel.permute(3, 2, 0, 1), self._bias, self._stride).movedim(1, -1) |
| 77 | + reference.backward(gradient=outputs_gradient, inputs=[inputs, self._kernel, self._bias]) |
| 78 | + for name, computed, reference in zip( |
| 79 | + ["Bias", "Kernel", "Inputs"], [bias_gradient, kernel_gradient, inputs_gradient], |
| 80 | + [self._bias.grad, self._kernel.grad, inputs.grad]): |
| 81 | + np.testing.assert_allclose(computed.detach().numpy(), reference.detach().numpy(), |
| 82 | + atol=2e-4, err_msg=name + " gradient differs!") |
| 83 | + |
| 84 | + # Return the inputs gradient, the layer variables, and their gradients. |
| 85 | + return inputs_gradient, [self._kernel, self._bias], [kernel_gradient, bias_gradient] |
| 86 | + |
| 87 | + |
| 88 | +class Model: |
| 89 | + def __init__(self, args: argparse.Namespace) -> None: |
| 90 | + self._args = args |
| 91 | + |
| 92 | + # Create the convolutional layers according to `args.cnn`. |
| 93 | + input_shape = [MNIST.H, MNIST.W, MNIST.C] |
| 94 | + self._convs = [] |
| 95 | + for layer in args.cnn.split(","): |
| 96 | + filters, kernel_size, stride = map(int, layer.split("-")) |
| 97 | + self._convs.append(Convolution(filters, kernel_size, stride, input_shape, args.seed, args.verify)) |
| 98 | + input_shape = [(input_shape[0] - kernel_size) // stride + 1, |
| 99 | + (input_shape[1] - kernel_size) // stride + 1, filters] |
| 100 | + |
| 101 | + # Create the classification head. |
| 102 | + self._classifier = torch.nn.Linear(np.prod(input_shape), MNIST.LABELS) |
| 103 | + |
| 104 | + def train_epoch(self, dataset: MNIST.Dataset) -> None: |
| 105 | + for batch in dataset.batches(self._args.batch_size, shuffle=True): |
| 106 | + # Forward pass through the convolutions. |
| 107 | + hidden = batch["images"].to(torch.float32).movedim(1, -1) / 255 |
| 108 | + conv_values = [hidden] |
| 109 | + for conv in self._convs: |
| 110 | + hidden = conv.forward(hidden) |
| 111 | + conv_values.append(hidden) |
| 112 | + |
| 113 | + # Run the classification head. |
| 114 | + hidden_flat = torch.flatten(hidden, 1) |
| 115 | + predictions = self._classifier(hidden_flat).softmax(dim=-1) |
| 116 | + |
| 117 | + # Compute the gradients of the classifier and the convolution output. |
| 118 | + one_hot_labels = torch.nn.functional.one_hot(batch["labels"].to(torch.int64), MNIST.LABELS) |
| 119 | + d_logits = (predictions - one_hot_labels) / len(batch["images"]) |
| 120 | + variables = [self._classifier.bias, self._classifier.weight] |
| 121 | + gradients = [d_logits.sum(dim=0), d_logits.T @ hidden_flat] |
| 122 | + hidden_gradient = (d_logits @ self._classifier.weight).reshape(hidden.shape) |
| 123 | + |
| 124 | + # Backpropagate the gradient through the convolutions. |
| 125 | + for conv, inputs, outputs in reversed(list(zip(self._convs, conv_values[:-1], conv_values[1:]))): |
| 126 | + hidden_gradient, conv_variables, conv_gradients = conv.backward(inputs, outputs, hidden_gradient) |
| 127 | + variables.extend(conv_variables) |
| 128 | + gradients.extend(conv_gradients) |
| 129 | + |
| 130 | + # Update the weights using a manual SGD. |
| 131 | + for variable, gradient in zip(variables, gradients): |
| 132 | + variable -= self._args.learning_rate * gradient |
| 133 | + |
| 134 | + def evaluate(self, dataset: MNIST.Dataset) -> float: |
| 135 | + total = correct = 0 |
| 136 | + for batch in dataset.batches(self._args.batch_size): |
| 137 | + hidden = batch["images"].to(torch.float32).movedim(1, -1) / 255 |
| 138 | + for conv in self._convs: |
| 139 | + hidden = conv.forward(hidden) |
| 140 | + hidden = torch.flatten(hidden, 1) |
| 141 | + predictions = self._classifier(hidden) |
| 142 | + correct += torch.sum(predictions.argmax(dim=-1) == batch["labels"]) |
| 143 | + total += len(batch["labels"]) |
| 144 | + return correct / total |
| 145 | + |
| 146 | + |
| 147 | +def main(args: argparse.Namespace) -> tuple[float, float]: |
| 148 | + # Set the random seed and the number of threads. |
| 149 | + npfl138.startup(args.seed, args.threads) |
| 150 | + npfl138.global_keras_initializers() |
| 151 | + |
| 152 | + # Do not compute gradients in this assignment. |
| 153 | + torch.set_grad_enabled(False) |
| 154 | + |
| 155 | + # Load data, using only 5 000 training images, and create the dataloaders. |
| 156 | + mnist = MNIST(sizes={"train": 5_000}) |
| 157 | + |
| 158 | + # Create the model. |
| 159 | + model = Model(args) |
| 160 | + |
| 161 | + for epoch in range(args.epochs): |
| 162 | + model.train_epoch(mnist.train) |
| 163 | + |
| 164 | + dev_accuracy = model.evaluate(mnist.dev) |
| 165 | + print("Dev accuracy after epoch {} is {:.2f}".format(epoch + 1, 100 * dev_accuracy)) |
| 166 | + |
| 167 | + test_accuracy = model.evaluate(mnist.test) |
| 168 | + print("Test accuracy after epoch {} is {:.2f}".format(epoch + 1, 100 * test_accuracy)) |
| 169 | + |
| 170 | + # Return dev and test accuracies for ReCodEx to validate. |
| 171 | + return dev_accuracy, test_accuracy |
| 172 | + |
| 173 | + |
| 174 | +if __name__ == "__main__": |
| 175 | + main_args = parser.parse_args([] if "__file__" not in globals() else None) |
| 176 | + main(main_args) |
0 commit comments