The Art of Noise
Understanding and implementing a diffusion model from scratch with PyTorch The post The Art of Noise appeared first on Towards Data Science.

Introduction
In my last several articles I talked about generative deep learning algorithms, which mostly are related to text generation tasks. So, I think it would be interesting to switch to generative algorithms for image generation now. We knew that nowadays there have been plenty of deep learning models specialized for generating images out there, such as Autoencoder, Variational Autoencoder (VAE), Generative Adversarial Network (GAN) and Neural Style Transfer (NST). I actually got some of my writings about these topics posted on Medium as well. I provide you the links at the end of this article if you want to read them.
In today’s article, I would like to discuss the so-called diffusion model — one of the most impactful models in the field of deep learning for image generation. The idea of this algorithm was first proposed in the paper titled Deep Unsupervised Learning using Nonequilibrium Thermodynamics written by Sohl-Dickstein et al. back in 2015 [1]. Their framework was then developed further by Ho et al. in 2020 in their paper titled Denoising Diffusion Probabilistic Models [2]. DDPM was later adapted by OpenAI and Google to develop DALLE-2 and Imagen, which we knew that these models have impressive capabilities to generate high-quality images.
How Diffusion Model Works
Generally speaking, diffusion model works by generating image from noise. We can think of it like an artist transforming a splash of paint on a canvas into a beautiful artwork. In order to do so, the diffusion model needs to be trained first. There are two main steps required to be followed to train the model, namely forward diffusion and backward diffusion.
As you can see in the above figure, forward diffusion is a process where Gaussian noise is applied to the original image iteratively. We keep adding the noise until the image is completely unrecognizable, at which point we can say that the image now lies in the latent space. Different from Autoencoders and GANs where the latent space typically has a lower dimension than the original image, the latent space in DDPM maintains the exact same dimensionality as the original one. This noising process follows the principle of a Markov Chain, meaning that the image at timestep t is affected only by timestep t-1. Forward diffusion is considered easy since what we basically do is just adding some noise step by step.
The second training phase is called backward diffusion, which our objective here is to remove the noise little by little until we obtain a clear image. This process follows the principle of the reverse Markov Chain, where the image at timestep t-1 can only be obtained based on the image at timestep t. Such a denoising process is really difficult since we need to guess which pixels are noise and which ones belong to the actual image content. Thus, we need to employ a neural network model to do so.
DDPM uses U-Net as the basis of the deep learning architecture for backward diffusion. However, instead of using the original U-Net model [4], we need to make several modifications to it so that it will be more suitable for our task. Later on, I am going to train this model on the MNIST Handwritten Digit dataset [5], and we will see whether it can generate similar images.
Well, that was pretty much all the fundamental concepts you need to know about diffusion models for now. In the next sections we are going to get even deeper into the details while implementing the algorithm from scratch.
PyTorch Implementation
We are going to start by importing the required modules. In case you’re not yet familiar with the imports below, both torch
and torchvision
are the libraries we’ll use for preparing the model and the dataset. Meanwhile, matplotlib
and tqdm
will help us display images and progress bars.
# Codeblock 1
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm
As the modules have been imported, the next thing to do is to initialize some config parameters. Look at the Codeblock 2 below for the details.
# Codeblock 2
IMAGE_SIZE = 28 #(1)
NUM_CHANNELS = 1 #(2)
BATCH_SIZE = 2
NUM_EPOCHS = 10
LEARNING_RATE = 0.001
NUM_TIMESTEPS = 1000 #(3)
BETA_START = 0.0001 #(4)
BETA_END = 0.02 #(5)
TIME_EMBED_DIM = 32 #(6)
DEVICE = torch.device("cuda" if torch.cuda.is_available else "cpu") #(7)
DEVICE
# Codeblock 2 Output
device(type='cuda')
At the lines marked with #(1)
and #(2)
I set IMAGE_SIZE
and NUM_CHANNELS
to 28 and 1, which these numbers are obtained from the image dimension in the MNIST dataset. The BATCH_SIZE
, NUM_EPOCHS
, and LEARNING_RATE
variables are pretty straightforward, so I don’t think I need to explain them further.
At line #(3)
, the variable NUM_TIMESTEPS
denotes the number of iterations in the forward and backward diffusion process. Timestep 0 is the condition where the image is in its original state (the leftmost image in Figure 1). In this case, since we set this parameter to 1000, timestep number 999 is going to be the condition where the image is completely unrecognizable (the rightmost image in Figure 1). It is important to keep in mind that the choice of the number of timesteps involves a tradeoff between model accuracy and computational cost. If we assign a small value for NUM_TIMESTEPS
, the inference time is going to be shorter, yet the resulting image might not be really good since the model has fewer steps to refine the image in the backward diffusion stage. On the other hand, increasing NUM_TIMESTEPS
will slow down the inference process, but we can expect the output image to have better quality thanks to the gradual denoising process which results in a more precise reconstruction.
Next, the BETA_START
(#(4)
) and BETA_END
(#(5)
) variables are used to control the amount of Gaussian noise added at each timestep, whereas TIME_EMBED_DIM
(#(6)
) is employed to determine the feature vector length for storing the timestep information. Lastly, at line #(7)
I assign “cuda”
to the DEVICE
variable if Pytorch detects GPU installed in our machine. I highly recommend you run this project on GPU since training a diffusion model is computationally expensive. In addition to the above parameters, the values set for NUM_TIMESTEPS
, BETA_START
and BETA_END
are all adopted directly from the DDPM paper [2].
The complete implementation will be done in several steps: constructing the U-Net model, preparing the dataset, defining noise scheduler for the diffusion process, training, and inference. We are going to discuss each of those stages in the following sub-sections.
The U-Net Architecture: Time Embedding
As I’ve mentioned earlier, the basis of a diffusion model is U-Net. This architecture is used because its output layer is suitable to represent an image, which definitely makes sense since it was initially introduced for image segmentation task at the first place. The following figure shows what the original U-Net architecture looks like.
However, it is necessary to modify this architecture so that it can also take into account the timestep information. Not only that, since we will only use MNIST dataset, we also need to make the model smaller. Just remember the convention in deep learning that simpler models are often more effective for simple tasks.
In the figure below I show you the entire U-Net model that has been modified. Here you can see that the time embedding tensor is injected to the model at every stage, which will later be done by element-wise summation, allowing the model to capture the timestep information. Next, instead of repeating each of the downsampling and the upsampling stages four times like the original U-Net, in this case we will only repeat each of them twice. Additionally, it is worth noting that the stack of downsampling stages is also known as the encoder, whereas the stack of upsampling stages is often called the decoder.
Now let’s start constructing the architecture by creating a class for generating the time embedding tensor, which the idea is similar to the positional embedding in Transformer. See the Codeblock 3 below for the details.
# Codeblock 3
class TimeEmbedding(nn.Module):
def forward(self):
time = torch.arange(NUM_TIMESTEPS, device=DEVICE).reshape(NUM_TIMESTEPS, 1) #(1)
print(f"time\t\t: {time.shape}")
i = torch.arange(0, TIME_EMBED_DIM, 2, device=DEVICE)
denominator = torch.pow(10000, i/TIME_EMBED_DIM)
print(f"denominator\t: {denominator.shape}")
even_time_embed = torch.sin(time/denominator) #(1)
odd_time_embed = torch.cos(time/denominator) #(2)
print(f"even_time_embed\t: {even_time_embed.shape}")
print(f"odd_time_embed\t: {odd_time_embed.shape}")
stacked = torch.stack([even_time_embed, odd_time_embed], dim=2) #(3)
print(f"stacked\t\t: {stacked.shape}")
time_embed = torch.flatten(stacked, start_dim=1, end_dim=2) #(4)
print(f"time_embed\t: {time_embed.shape}")
return time_embed
What we basically do in the above code is to create a tensor of size NUM_TIMESTEPS
× TIME_EMBED_DIM
(1000×32), where every single row of this tensor will contain the timestep information. Later on, each of the 1000 timesteps will be represented by a feature vector of length 32. The values in the tensor themselves are obtained based on the two equations in Figure 4. In the Codeblock 3 above, these two equations are implemented at line #(1)
and #(2)
, each forming a tensor having the size of 1000×16. Next, these tensors are combined using the code at line #(3)
and #(4)
.
Here I also print out every single step done in the above codeblock so that you can get a better understanding of what is actually being done in the TimeEmbedding class. If you still want more explanation about the above code, feel free to read my previous post about Transformer which you can access through the link at the end of this article. Once you clicked the link, you can just scroll all the way down to the Positional Encoding section.
Now let’s check if the TimeEmbedding
class works properly using the following testing code. The resulting output shows that it successfully produced a tensor of size 1000×32, which is exactly what we expected earlier.
# Codeblock 4
time_embed_test = TimeEmbedding()
out_test = time_embed_test()
# Codeblock 4 Output
time : torch.Size([1000, 1])
denominator : torch.Size([16])
even_time_embed : torch.Size([1000, 16])
odd_time_embed : torch.Size([1000, 16])
stacked : torch.Size([1000, 16, 2])
time_embed : torch.Size([1000, 32])
The U-Net Architecture: DoubleConv
If you take a closer look at the modified architecture, you will see that we actually got lots of repeating patterns, such as the ones highlighted in yellow boxes in the following figure.
DoubleConv
class [3].
These five yellow boxes share the same structure, where they consist of two convolution layers with the time embedding tensor injected right after the first convolution operation is performed. So, what we are going to do now is to create another class named DoubleConv
to reproduce this structure. Look at the Codeblock 5a and 5b below to see how I do that.
# Codeblock 5a
class DoubleConv(nn.Module):
def __init__(self, in_channels, out_channels): #(1)
super().__init__()
self.conv_0 = nn.Conv2d(in_channels=in_channels, #(2)
out_channels=out_channels,
kernel_size=3,
bias=False,
padding=1)
self.bn_0 = nn.BatchNorm2d(num_features=out_channels) #(3)
self.time_embedding = TimeEmbedding() #(4)
self.linear = nn.Linear(in_features=TIME_EMBED_DIM, #(5)
out_features=out_channels)
self.conv_1 = nn.Conv2d(in_channels=out_channels, #(6)
out_channels=out_channels,
kernel_size=3,
bias=False,
padding=1)
self.bn_1 = nn.BatchNorm2d(num_features=out_channels) #(7)
self.relu = nn.ReLU(inplace=True) #(8)
The two inputs of the __init__()
method above gives us flexibility to configure the number of input and output channels (#(1)
) so that the DoubleConv
class can be used to instantiate all the five yellow boxes simply by adjusting its input arguments. As the name suggests, here we initialize two convolution layers (line #(2)
and #(6)
), each followed by a batch normalization layer and a ReLU activation function. Keep in mind that the two normalization layers need to be initialized separately (line #(3)
and #(7)
) since each of them has their own trainable normalization parameters. Meanwhile, the ReLU activation function should only be initialized once (#(8)
) because it contains no parameters, allowing it to be used multiple times in different parts of the network. At line #(4)
, we initialize the TimeEmbedding
layer we created earlier, which will later be connected to a standard linear layer (#(5)
). This linear layer is responsible to adjust the dimension of the time embedding tensor so that the resulting output can be summed with the output from the first convolution layer in an element-wise manner.
Now let’s take a look at the Codeblock 5b below to better understand the flow of the DoubleConv
block. Here you can see that the forward()
method accepts two inputs: the raw image x
and the timestep information t
as shown at line #(1)
. We initially process the image with the first Conv-BN-ReLU sequence (#(2–4)
). This Conv-BN-ReLU structure is typically used when working with CNN-based models, even if the illustration does not explicitly show the batch normalization and the ReLU layers. Apart from the image, we then take the t-th timestep information from our embedding tensor of the corresponding image (#(5)
) and pass it through the linear layer (#(6)
). We still need to expand the dimension of the resulting tensor using the code at line #(7)
before performing element-wise summation at line #(8)
. Finally, we process the resulting tensor with the second Conv-BN-ReLU sequence (#(9–11)
).
# Codeblock 5b
def forward(self, x, t): #(1)
print(f'images\t\t\t: {x.size()}')
print(f'timesteps\t\t: {t.size()}, {t}')
x = self.conv_0(x) #(2)
x = self.bn_0(x) #(3)
x = self.relu(x) #(4)
print(f'\nafter first conv\t: {x.size()}')
time_embed = self.time_embedding()[t] #(5)
print(f'\ntime_embed\t\t: {time_embed.size()}')
time_embed = self.linear(time_embed) #(6)
print(f'time_embed after linear\t: {time_embed.size()}')
time_embed = time_embed[:, :, None, None] #(7)
print(f'time_embed expanded\t: {time_embed.size()}')
x = x + time_embed #(8)
print(f'\nafter summation\t\t: {x.size()}')
x = self.conv_1(x) #(9)
x = self.bn_1(x) #(10)
x = self.relu(x) #(11)
print(f'after second conv\t: {x.size()}')
return x
To see if our DoubleConv
implementation works properly, we are going to test it with the Codeblock 6 below. Here I want to simulate the very first instance of this block, which corresponds to the leftmost yellow box in Figure 5. To do so, we need to we need to set the in_channels
and out_channels
parameters to 1 and 64, respectively (#(1)
). Next, we initialize two input tensors, namely x_test
and t_test
. The x_test
tensor has the size of 2×1×28×28, representing a batch of two grayscale images having the size of 28×28 (#(2)
). Keep in mind that this is just a dummy tensor of random values which will be replaced with the actual images from MNIST dataset later in the training phase. Meanwhile, t_test
is a tensor containing the timestep numbers of the corresponding images (#(3)
). The values for this tensor are randomly selected between 0 and NUM_TIMESTEPS
(1000). Note that the datatype of this tensor must be an integer since the numbers will be used for indexing, as shown at line #(5)
back in Codeblock 5b. Lastly, at line #(4)
we pass both x_test
and t_test
tensors to the double_conv_test
layer.
By the way, I re-run the previous codeblocks with the print()
functions removed prior to running the following code so that the outputs will look neater.
# Codeblock 6
double_conv_test = DoubleConv(in_channels=1, out_channels=64).to(DEVICE) #(1)
x_test = torch.randn((BATCH_SIZE, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE) #(2)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE) #(3)
out_test = double_conv_test(x_test, t_test) #(4)
# Codeblock 6 Output
images : torch.Size([2, 1, 28, 28]) #(1)
timesteps : torch.Size([2]), tensor([468, 304], device='cuda:0') #(2)
after first conv : torch.Size([2, 64, 28, 28]) #(3)
time_embed : torch.Size([2, 32]) #(4)
time_embed after linear : torch.Size([2, 64])
time_embed expanded : torch.Size([2, 64, 1, 1]) #(5)
after summation : torch.Size([2, 64, 28, 28]) #(6)
after second conv : torch.Size([2, 64, 28, 28]) #(7)
The shape of our original input tensors can be seen at lines #(1)
and #(2)
in the above output. Specifically at line #(2)
, I also print out the two timesteps that we selected randomly. In this example we assume that each of the two images in the x tensor are already noised with the noise level from 468-th and 304-th timesteps prior to being fed into the network. We can see that the shape of the image tensor x changes to 2×64×28×28 after being passed through the first convolution layer (#(3)
). Meanwhile, the size of our time embedding tensor becomes 2×32 (#(4)
), which is obtained by extracting rows 468 and 304 from the original embedding of size 1000×32. In order to allow element-wise summation to be performed (#(6)
), we need to map the 32-dimensional time embedding vectors into 64 and expand their axes, resulting in a tensor of size 2×64×1×1 (#(5)
) so that it can be broadcast to the 2×64×28×28 tensor. After the summation is done, we then pass the tensor through the second convolution layer, at which point the tensor dimension does not change at all (#(7)
).
The U-Net Architecture: Encoder
As we have successfully implemented the DoubleConv
block, the next step to do is to implement the so-called DownSample
block. In Figure 6 below, this corresponds to the parts enclosed in the red box.
DownSample
blocks [3].
The purpose of a DownSample
block is to reduce the spatial dimension of an image, but it is important to note that at the same time it increases the number of channels. In order to achieve this, we can simply stack a DoubleConv
block and a maxpooling operation. In this case the pooling uses 2×2 kernel size with the stride of 2, causing the spatial dimension of the image to be twice as small as the input. The implementation of this block can be seen in Codeblock 7 below.
# Codeblock 7
class DownSample(nn.Module):
def __init__(self, in_channels, out_channels): #(1)
super().__init__()
self.double_conv = DoubleConv(in_channels=in_channels, #(2)
out_channels=out_channels)
self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2) #(3)
def forward(self, x, t): #(4)
print(f'original\t\t: {x.size()}')
print(f'timesteps\t\t: {t.size()}, {t}')
convolved = self.double_conv(x, t) #(5)
print(f'\nafter double conv\t: {convolved.size()}')
maxpooled = self.maxpool(convolved) #(6)
print(f'after pooling\t\t: {maxpooled.size()}')
return convolved, maxpooled #(7)
Here I set the __init__()
method to take number of input and output channels so that we can use it for creating the two DownSample
blocks highlighted in Figure 6 without needing to write them in separate classes (#(1)
). Next, the DoubleConv
and the maxpooling layers are initialized at line #(2)
and #(3)
, respectively. Remember that since the DoubleConv
block accepts image x
and the corresponding timestep t
as the inputs, we also need to set the forward()
method of this DownSample
block such that it accepts both of them as well (#(4)
). The information contained in x and t are then combined as the two tensors are processed by the double_conv
layer, which the output is stored in the variable named convolved
(#(5)
). Afterwards, we now actually perform the downsampling with the maxpooling operation at line #(6)
, producing a tensor named maxpooled
. It is important to note that both the convolved
and maxpooled
tensors are going to be returned, which is essentially done because we will later bring maxpooled
to the next downsampling stage, whereas the convolved
tensor will be transferred directly to the upsampling stage in the decoder through skip-connections.
Now let’s test the DownSample
class using the Codeblock 8 below. The input tensors used here are exactly the same as the ones in Codeblock 6. Based on the resulting output, we can see that the pooling operation successfully converted the output of the DoubleConv
block from 2×64×28×28 (#(1)
) to 2×64×14×14 (#(2)
), indicating that our DownSample class works properly.
# Codeblock 8
down_sample_test = DownSample(in_channels=1, out_channels=64).to(DEVICE)
x_test = torch.randn((BATCH_SIZE, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)
out_test = down_sample_test(x_test, t_test)
# Codeblock 8 Output
original : torch.Size([2, 1, 28, 28])
timesteps : torch.Size([2]), tensor([468, 304], device='cuda:0')
after double conv : torch.Size([2, 64, 28, 28]) #(1)
after pooling : torch.Size([2, 64, 14, 14]) #(2)
The U-Net Architecture: Decoder
We need to introduce the so-called UpSample
block in the decoder, which is responsible for reverting the tensor in the intermediate layers to the original image dimension. In order to maintain a symmetrical structure, the number of UpSample
blocks must match that of the DownSample
blocks. Look at the Figure 7 below to see where the two UpSample
blocks are placed.
UpSample
blocks [3].
Since both UpSample
blocks are structurally identical, we can just initialize a single class for them, just like the DownSample
class we created earlier. Look at the Codeblock 9 below to see how I implement it.
# Codeblock 9
class UpSample(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.conv_transpose = nn.ConvTranspose2d(in_channels=in_channels, #(1)
out_channels=out_channels,
kernel_size=2, stride=2) #(2)
self.double_conv = DoubleConv(in_channels=in_channels, #(3)
out_channels=out_channels)
def forward(self, x, t, connection): #(4)
print(f'original\t\t: {x.size()}')
print(f'timesteps\t\t: {t.size()}, {t}')
print(f'connection\t\t: {connection.size()}')
x = self.conv_transpose(x) #(5)
print(f'\nafter conv transpose\t: {x.size()}')
x = torch.cat([x, connection], dim=1) #(6)
print(f'after concat\t\t: {x.size()}')
x = self.double_conv(x, t) #(7)
print(f'after double conv\t: {x.size()}')
return x
In the __init__()
method, we use nn.ConvTranspose2d
to upsample the spatial dimension (#(1)
). Both the kernel size and stride are set to 2 so that the output will be twice as large (#(2)
). Next, the DoubleConv
block will be employed to reduce the number of channels, while at the same time combining the timestep information from the time embedding tensor (#(3)
).
The flow of this UpSample
class is a bit more complicated than the DownSample
class. If we take a closer look at the architecture, we’ll see that that we also have a skip-connection coming directly from the encoder. Thus, we need the forward()
method to accept another argument in addition to the original image x
and the timestep t
, namely the residual tensor connection
(#(4)
). The first thing we do inside this method is to process the original image x
with the transpose convolution layer (#(5)
). In fact, not only upsampling the spatial size, but this layer also reduces the number of channels at the same time. However, the resulting tensor is then directly concatenated with connection
in a channel-wise manner (#(6)
), causing it to seem like no channel reduction is performed. It is important to know that at this point these two tensors are just concatenated, meaning that the information from the two are not yet combined. We finally feed these concatenated tensors to the double_conv
layer (#(7)
), allowing them to share information to each other through the learnable parameters inside the convolution layers.
The Codeblock 10 below shows how I test the UpSample
class. The size of the tensors to be passed through are set according to the second upsampling block, i.e., the rightmost blue box in Figure 7.
# Codeblock 10
up_sample_test = UpSample(in_channels=128, out_channels=64).to(DEVICE)
x_test = torch.randn((BATCH_SIZE, 128, 14, 14)).to(DEVICE)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)
connection_test = torch.randn((BATCH_SIZE, 64, 28, 28)).to(DEVICE)
out_test = up_sample_test(x_test, t_test, connection_test)
In the resulting output below, if we compare the input tensor (#(1)
) with the final tensor shape (#(2)
), we can clearly see that the number of channels successfully reduced from 128 to 64, while at the same time the spatial dimension increased from 14×14 to 28×28. This essentially means that our UpSample
class is now ready to be used in the main U-Net architecture.
# Codeblock 10 Output
original : torch.Size([2, 128, 14, 14]) #(1)
timesteps : torch.Size([2]), tensor([468, 304], device='cuda:0')
connection : torch.Size([2, 64, 28, 28])
after conv transpose : torch.Size([2, 64, 28, 28])
after concat : torch.Size([2, 128, 28, 28])
after double conv : torch.Size([2, 64, 28, 28]) #(2)
The U-Net Architecture: Putting All Components Together
Once all U-Net components have been created, what we are going to do next is to wrap them together into a single class. Look at the Codeblock 11a and 11b below for the details.
# Codeblock 11a
class UNet(nn.Module):
def __init__(self):
super().__init__()
self.downsample_0 = DownSample(in_channels=NUM_CHANNELS, #(1)
out_channels=64)
self.downsample_1 = DownSample(in_channels=64, #(2)
out_channels=128)
self.bottleneck = DoubleConv(in_channels=128, #(3)
out_channels=256)
self.upsample_0 = UpSample(in_channels=256, #(4)
out_channels=128)
self.upsample_1 = UpSample(in_channels=128, #(5)
out_channels=64)
self.output = nn.Conv2d(in_channels=64, #(6)
out_channels=NUM_CHANNELS,
kernel_size=1)
You can see in the __init__()
method above that we initialize two downsampling (#(1–2)
) and two upsampling (#(4–5)
) blocks, which the number of input and output channels are set according to the architecture shown in the illustration. There are actually two additional components I haven’t explained yet, namely the bottleneck (#(3)
) and the output layer (#(6)
). The former is essentially just a DoubleConv
block, which acts as the main connection between the encoder and the decoder. Look at the Figure 8 below to see which components of the network belong to the bottleneck layer. Next, the output layer is a standard convolution layer which is responsible to turn the 64-channel image produced by the last UpSampling
stage into 1-channel only. This operation is done using a kernel of size 1×1, meaning that it combines information across all channels while operating independently at each pixel position.
I guess the forward()
method of the entire U-Net in the following codeblock is pretty straightforward, as what we essentially do here is pass the tensors from one layer to another — just don’t forget to include the skip connections between the downsampling and upsampling blocks.
# Codeblock 11b
def forward(self, x, t): #(1)
print(f'original\t\t: {x.size()}')
print(f'timesteps\t\t: {t.size()}, {t}')
convolved_0, maxpooled_0 = self.downsample_0(x, t)
print(f'\nmaxpooled_0\t\t: {maxpooled_0.size()}')
convolved_1, maxpooled_1 = self.downsample_1(maxpooled_0, t)
print(f'maxpooled_1\t\t: {maxpooled_1.size()}')
x = self.bottleneck(maxpooled_1, t)
print(f'after bottleneck\t: {x.size()}')
upsampled_0 = self.upsample_0(x, t, convolved_1)
print(f'upsampled_0\t\t: {upsampled_0.size()}')
upsampled_1 = self.upsample_1(upsampled_0, t, convolved_0)
print(f'upsampled_1\t\t: {upsampled_1.size()}')
x = self.output(upsampled_1)
print(f'final output\t\t: {x.size()}')
return x
Now let’s see whether we have correctly constructed the U-Net class above by running the following testing code.
# Codeblock 12
unet_test = UNet().to(DEVICE)
x_test = torch.randn((BATCH_SIZE, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)
out_test = unet_test(x_test, t_test)
# Codeblock 12 Output
original : torch.Size([2, 1, 28, 28]) #(1)
timesteps : torch.Size([2]), tensor([468, 304], device='cuda:0')
maxpooled_0 : torch.Size([2, 64, 14, 14]) #(2)
maxpooled_1 : torch.Size([2, 128, 7, 7]) #(3)
after bottleneck : torch.Size([2, 256, 7, 7]) #(4)
upsampled_0 : torch.Size([2, 128, 14, 14])
upsampled_1 : torch.Size([2, 64, 28, 28])
final output : torch.Size([2, 1, 28, 28]) #(5)
We can see in the above output that the two downsampling stages successfully converted the original tensor of size 1×28×28 (#(1)
) into 64×14×14 (#(2)
) and 128×7×7 (#(3)
), respectively. This tensor is then passed through the bottleneck layer, causing its number of channels to expand to 256 without changing the spatial dimension (#(4)
). Lastly, we upsample the tensor twice before eventually shrinking the number of channels to 1 (#(5)
). Based on this output, it looks like our model is working properly. Thus, it is now ready to be trained for our diffusion task.
Dataset Preparation
As we have successfully created the entire U-Net architecture, the next thing to do is to prepare the MNIST Handwritten Digit dataset. Before actually loading it, we need to define the preprocessing steps first using the transforms.Compose()
method from Torchvision, as shown at line #(1)
in Codeblock 13. There are two things we do here: converting the images into PyTorch tensors which also scales the pixel values from 0–255 to 0–1 (#(2)
), and normalize them so that the final pixel values ranging between -1 and 1 (#(3)
). Next, we download the dataset using datasets.MNIST()
. In this case, we are going to take the images from the training data, hence we use train=True
(#(5)
). Don’t forget to pass the transform
variable we initialized earlier to the transform
parameter (transform=transform
) so that it will automatically preprocess the images as we load them (#(6)
). Lastly, we need to employ DataLoader
to load the images from mnist_dataset
(#(7)
). The arguments I use for the input parameters are intended to randomly pick BATCH_SIZE
(2) images from the dataset in each iteration.
# Codeblock 13
transform = transforms.Compose([ #(1)
transforms.ToTensor(), #(2)
transforms.Normalize((0.5,), (0.5,)) #(3)
])
mnist_dataset = datasets.MNIST( #(4)
root='./data',
train=True, #(5)
download=True,
transform=transform #(6)
)
loader = DataLoader(mnist_dataset, #(7)
batch_size=BATCH_SIZE,
drop_last=True,
shuffle=True)
In the following codeblock, I try to load a batch of images from the dataset. In every iteration, loader
provides both the images and the corresponding labels, hence we need to store them in two separate variables: images
and labels
.
# Codeblock 14
images, labels = next(iter(loader))
print('images\t\t:', images.shape)
print('labels\t\t:', labels.shape)
print('min value\t:', images.min())
print('max value\t:', images.max())
We can see in the resulting output below that the images
tensor has the size of 2×1×28×28 (#(1)
), indicating that two grayscale images of size 28×28 have been successfully loaded. Here we can also see that the length of the labels
tensor is 2, which matches the number of the loaded images (#(2)
). Note that in this case the labels are going to be completely ignored. My plan here is that I just want the model to generate any number it previously seen from the entire training dataset without even knowing what number it actually is. Lastly, this output also shows that the preprocessing works properly, as the pixel values now range between -1 and 1.
# Codeblock 14 Output
images : torch.Size([2, 1, 28, 28]) #(1)
labels : torch.Size([2]) #(2)
min value : tensor(-1.)
max value : tensor(1.)
Run the following code if you want to see what the image we just loaded looks like.
# Codeblock 15
plt.imshow(images[0].squeeze(), cmap='gray')
plt.show()

Noise Scheduler
In this section we are going to talk about how the forward and backward diffusion are performed, which the process essentially involves adding or removing noise little by little at each timestep. It is necessary to know that we basically want a uniform amount of noise across all timesteps, where in the forward diffusion the image should be completely full of noise exactly at timestep 1000, while in the backward diffusion, we have to get the completely clear image at timestep 0. Hence, we need something to control the noise amount for each timestep. Later in this section, I am going to implement a class named NoiseScheduler
to do so. — This will probably be the most mathy section of this article, as I’ll display many equations here. But don’t worry about that since we’ll focus on implementing these equations rather than discussing the mathematical derivations.
Now let’s take a look at the equations in Figure 10 which I will implement in the __init__()
method of the NoiseScheduler
class below.
NoiseScheduler
class [3].
# Codeblock 16a
class NoiseScheduler:
def __init__(self):
self.betas = torch.linspace(BETA_START, BETA_END, NUM_TIMESTEPS) #(1)
self.alphas = 1. - self.betas
self.alphas_cum_prod = torch.cumprod(self.alphas, dim=0)
self.sqrt_alphas_cum_prod = torch.sqrt(self.alphas_cum_prod)
self.sqrt_one_minus_alphas_cum_prod = torch.sqrt(1. - self.alphas_cum_prod)
The above code works by creating multiple sequences of numbers, all of them are basically controlled by BETA_START
(0.0001), BETA_END
(0.02), and NUM_TIMESTEPS
(1000). The first sequence we need to instantiate is the betas
itself, which is done using torch.linspace()
(#(1)
). What it essentially does is that it generates a 1-dimensional tensor of length 1000 starting from 0.0001 to 0.02, where every single element in this tensor corresponds to a single timestep. The interval between each element is uniform, allowing us to generate uniform amount of noise throughout all timesteps as well. With this betas
tensor, we then compute alphas
, alphas_cum_prod
, sqrt_alphas_cum_prod
and sqrt_one_minus_alphas_cum_prod
based on the four equations in Figure 10. Later on, these tensors will act as the basis of how the noise is generated or removed during the diffusion process.
Diffusion is normally done in a sequential manner. However, the forward diffusion process is deterministic, hence we can derive the original equation into a closed form so that we can obtain the noise at a specific timestep without having to iteratively add noise from the very beginning. The Figure 11 below shows what the closed form of the forward diffusion looks like, where x₀ represents the original image while epsilon (ϵ) denotes an image made up of random Gaussian noise. We can think of this equation as a weighted combination, where we combine the clear image and the noise according to weights determined by the timestep, resulting in an image with a specific amount of noise.
The implementation of this equation can be seen in Codeblock 16b. In this forward_diffusion()
method, x₀ and ϵ are denoted as original
and noise
. Here you need to keep in mind that these two input variables are images, whereas sqrt_alphas_cum_prod_t
and sqrt_one_minus_alphas_cum_prod_t
are scalars. Thus, we need to adjust the shape of these two scalars (#(1)
and #(2)
) so that the operation at line #(3)
can be performed. The noisy_image
variable is going to be the output of this function, which I guess the name is self-explanatory.
# Codeblock 16b
def forward_diffusion(self, original, noise, t):
sqrt_alphas_cum_prod_t = self.sqrt_alphas_cum_prod[t]
sqrt_alphas_cum_prod_t = sqrt_alphas_cum_prod_t.to(DEVICE).view(-1, 1, 1, 1) #(1)
sqrt_one_minus_alphas_cum_prod_t = self.sqrt_one_minus_alphas_cum_prod[t]
sqrt_one_minus_alphas_cum_prod_t = sqrt_one_minus_alphas_cum_prod_t.to(DEVICE).view(-1, 1, 1, 1) #(2)
noisy_image = sqrt_alphas_cum_prod_t * original + sqrt_one_minus_alphas_cum_prod_t * noise #(3)
return noisy_image
Now let’s talk about backward diffusion. In fact, this one is a bit more complicated than the forward diffusion since we need three more equations here. Before I give you these equations, let me show you the implementation first. See the Codeblock 16c below.
# Codeblock 16c
def backward_diffusion(self, current_image, predicted_noise, t): #(1)
denoised_image = (current_image - (self.sqrt_one_minus_alphas_cum_prod[t] * predicted_noise)) / self.sqrt_alphas_cum_prod[t] #(2)
denoised_image = 2 * (denoised_image - denoised_image.min()) / (denoised_image.max() - denoised_image.min()) - 1 #(3)
current_prediction = current_image - ((self.betas[t] * predicted_noise) / (self.sqrt_one_minus_alphas_cum_prod[t])) #(4)
current_prediction = current_prediction / torch.sqrt(self.alphas[t]) #(5)
if t == 0: #(6)
return current_prediction, denoised_image
else:
variance = (1 - self.alphas_cum_prod[t-1]) / (1. - self.alphas_cum_prod[t]) #(7)
variance = variance * self.betas[t] #(8)
sigma = variance ** 0.5
z = torch.randn(current_image.shape).to(DEVICE)
current_prediction = current_prediction + sigma*z
return current_prediction, denoised_image
Later in the inference phase, the backward_diffusion()
method will be called inside a loop that iterates NUM_TIMESTEPS
(1000) times, starting from t = 999, continued with t = 998, and so on all the way to t = 0. This function is responsible to remove the noise from the image iteratively based on the current_image
(the image produced by the previous denoising step), the predicted_noise
(the noise predicted by U-Net in the previous step), and the timestep information t
(#(1)
). In each iteration, noise removal is done using the equation shown in Figure 12, which in Codeblock 16c, this corresponds to lines #(4-5)
.
As long as we haven’t reached t = 0, we will compute the variance based on the equation in Figure 13 (#(7–8)
). This variance will then be used to introduce another controlled noise to simulate the stochasticity in the backward diffusion process since the noise removal equation in Figure 12 is a deterministic approximation. This is essentially also the reason that we don’t calculate the variance once we reached t = 0 (#(6)
) since we no longer need to add more noise as the image is completely clear already.
Different from current_prediction
which aims to estimate the image of the previous timestep (xₜ₋₁), the objective of the denoised_image
tensor is to reconstruct the original image (x₀). Thanks to these different objectives, we need a separate equation to compute denoised_image
, which can be seen in Figure 14 below. The implementation of the equation itself is written at line #(2–3)
.
Now let’s test the NoiseScheduler
class we created above. In the following codeblock, I instantiate a NoiseScheduler
object and print out the attributes associated with it, which are all computed using the equation in Figure 10 based on the values stored in the betas
attribute. Remember that the actual length of these tensors is NUM_TIMESTEPS
(1000), but here I only print out the first 6 elements.
# Codeblock 17
noise_scheduler = NoiseScheduler()
print(f'betas\t\t\t\t: {noise_scheduler.betas[:6]}')
print(f'alphas\t\t\t\t: {noise_scheduler.alphas[:6]}')
print(f'alphas_cum_prod\t\t\t: {noise_scheduler.alphas_cum_prod[:6]}')
print(f'sqrt_alphas_cum_prod\t\t: {noise_scheduler.sqrt_alphas_cum_prod[:6]}')
print(f'sqrt_one_minus_alphas_cum_prod\t: {noise_scheduler.sqrt_one_minus_alphas_cum_prod[:6]}')
# Codeblock 17 Output
betas : tensor([1.0000e-04, 1.1992e-04, 1.3984e-04, 1.5976e-04, 1.7968e-04, 1.9960e-04])
alphas : tensor([0.9999, 0.9999, 0.9999, 0.9998, 0.9998, 0.9998])
alphas_cum_prod : tensor([0.9999, 0.9998, 0.9996, 0.9995, 0.9993, 0.9991])
sqrt_alphas_cum_prod : tensor([0.9999, 0.9999, 0.9998, 0.9997, 0.9997, 0.9996])
sqrt_one_minus_alphas_cum_prod : tensor([0.0100, 0.0148, 0.0190, 0.0228, 0.0264, 0.0300])
The above output indicates that our __init__()
method works as expected. Next, we are going to test the forward_diffusion()
method. If you go back to Figure 16b, you will see that forward_diffusion()
accepts three inputs: original image, noise image and the timestep number. Let’s just use the image from the MNIST dataset we loaded earlier for the first input (#(1)
) and a random Gaussian noise of the exact same size for the second one (#(2)
). Run the Codeblock 18 below to see what these two images look like.
# Codeblock 18
image = images[0] #(1)
noise = torch.randn_like(image) #(2)
plt.imshow(image.squeeze(), cmap='gray')
plt.show()
plt.imshow(noise.squeeze(), cmap='gray')
plt.show()

As we already got the image and the noise ready, what we need to do afterwards is to pass them to the forward_diffusion()
method alongside the t. I actually tried to run the Codeblock 19 below multiple times with t = 50, 100, 150, and so on up to t = 300. You can see in Figure 16 that the image becomes less clear as the parameter increases. In this case, the image is going to be completely filled by noise when the t is set to 999.
# Codeblock 19
noisy_image_test = noise_scheduler.forward_diffusion(image.to(DEVICE), noise.to(DEVICE), t=50)
plt.imshow(noisy_image_test[0].squeeze().cpu(), cmap='gray')
plt.show()

Unfortunately, we cannot test the backward_diffusion()
method since this process requires us to have our U-Net model trained. So, let’s just skip this part for now. I’ll show you how we can actually use this function later in the inference phase.
Training
As the U-Net model, MNIST dataset, and the noise scheduler are ready, we can now prepare a function for training. Before we do that, I instantiate the model and the noise scheduler in Codeblock 20 below.
# Codeblock 20
model = UNet().to(DEVICE)
noise_scheduler = NoiseScheduler()
The entire training procedure is implemented in the train()
function shown in Codeblock 21. Before doing anything, we first initialize the optimizer and the loss function, which in this case we use Adam and MSE, respectively (#(1–2)
). What we basically want to do here is to train the model such that it will be able to predict the noise contained in the input image, which later on, the predicted noise will be used as the basis of the denoising process in the backward diffusion stage. To actually train the model, we first need to perform forward diffusion using the code at line #(6)
. This noising process will be done on the images
tensor (#(3)
) using the random noise generated at line #(4)
. Next, we take random number somewhere between 0 and NUM_TIMESTEPS
(1000) for the t
(#(5)
), which is essentially done because we want our model to see images of varying noise levels as an approach to improve generalization. As the noisy images have been generated, we then pass it through the U-Net model alongside the chosen t
(#(7)
). The input t
here is useful for the model as it indicates the current noise level in the image. Lastly, the loss function we initialized earlier is responsible to compute the difference between the actual noise and the predicted noise from the original image (#(8)
). So, the objective of this training is basically to make the predicted noise as similar as possible to the noise we generated at line #(4)
.
# Codeblock 21
def train():
optimizer = Adam(model.parameters(), lr=LEARNING_RATE) #(1)
loss_function = nn.MSELoss() #(2)
losses = []
for epoch in range(NUM_EPOCHS):
print(f'Epoch no {epoch}')
for images, _ in tqdm(loader):
optimizer.zero_grad()
images = images.float().to(DEVICE) #(3)
noise = torch.randn_like(images) #(4)
t = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)) #(5)
noisy_images = noise_scheduler.forward_diffusion(images, noise, t).to(DEVICE) #(6)
predicted_noise = model(noisy_images, t) #(7)
loss = loss_function(predicted_noise, noise) #(8)
losses.append(loss.item())
loss.backward()
optimizer.step()
return losses
Now let’s run the above training function using the codeblock below. Sit back and relax while waiting the training completes. In my case, I used Kaggle Notebook with Nvidia GPU P100 turned on, and it took around 45 minutes to finish.
# Codeblock 22
losses = train()
If we take a look at the loss graph, it seems like our model learned pretty well as the value is generally decreasing over time with a rapid drop at early stages and a more stable (yet still decreasing) trend in the later stages. So, I think we can expect good results later in the inference phase.
# Codeblock 23
plt.plot(losses)

Inference
At this point we have already got our model trained, so we can now perform inference on it. Look at the Codeblock 24 below to see how I implement the inference()
function.
# Codeblock 24
def inference():
denoised_images = [] #(1)
with torch.no_grad(): #(2)
current_prediction = torch.randn((64, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE) #(3)
for i in tqdm(reversed(range(NUM_TIMESTEPS))): #(4)
predicted_noise = model(current_prediction, torch.as_tensor(i).unsqueeze(0)) #(5)
current_prediction, denoised_image = noise_scheduler.backward_diffusion(current_prediction, predicted_noise, torch.as_tensor(i)) #(6)
if i%100 == 0: #(7)
denoised_images.append(denoised_image)
return denoised_images
At the line marked with #(1)
I initialize an empty list which will be used to store the denoising result every 100 timesteps (#(7)
). This will later allow us to see how the backward diffusion goes. The actual inference process is encapsulated inside torch.no_grad()
(#(2)
). Remember that in diffusion models we generate images from a completely random noise, which we assume that these images are initially at t = 999. To implement this, we can simply use torch.randn()
as shown at line #(3)
. Here we initialize a tensor of size 64×1×28×28, indicating that we are about to generate 64 images simultaneously. Next, we write a for
loop that iterates backwards starting from 999 to 0 (#(4)
). Inside this loop, we feed the current image and the timestep as the input for the trained U-Net and let it predict the noise (#(5)
). The actual backward diffusion is then performed at line #(6)
. At the end of the iteration, we should get new images similar to the ones we have in our dataset. Now let’s call the inference()
function in the following codeblock.
# Codeblock 25
denoised_images = inference()
As the inference completed, we can now see what the resulting images look like. The Codeblock 26 below is used to display the first 42 images we just generated.
# Codeblock 26
fig, axes = plt.subplots(ncols=7, nrows=6, figsize=(10, 8))
counter = 0
for i in range(6):
for j in range(7):
axes[i,j].imshow(denoised_images[-1][counter].squeeze().detach().cpu().numpy(), cmap='gray') #(1)
axes[i,j].get_xaxis().set_visible(False)
axes[i,j].get_yaxis().set_visible(False)
counter += 1
plt.show()

If we take a look at the above codeblock, you can see that the indexer of [-1]
at line #(1)
indicates that we only display the images from the last iteration (which corresponds to timestep 0). This is the reason that the images you see in Figure 18 are all free from noise. I do acknowledge that this might not be the best of a result since not all the generated images are valid digit numbers. — But hey, this instead indicates that these images are not merely duplicates from the original dataset.
Here we can also visualize the backward diffusion process using the Codeblock 27 below. You can see in the resulting output in Figure 19 that we initially start from a complete random noise, which gradually disappears as we move to the right.
# Codeblock 27
fig, axes = plt.subplots(ncols=10, figsize=(24, 8))
sample_no = 0
timestep_no = 0
for i in range(10):
axes[i].imshow(denoised_images[timestep_no][sample_no].squeeze().detach().cpu().numpy(), cmap='gray')
axes[i].get_xaxis().set_visible(False)
axes[i].get_yaxis().set_visible(False)
timestep_no += 1
plt.show()

Ending
There are plenty of directions you can go from here. First, you might probably need to tweak the parameter configurations in Codeblock 2 if you want better results. Second, it is also possible to modify the U-Net model by implementing attention layers in addition to the stack of convolution layers we used in the downsampling and the upsampling stages. This does not guarantee you to obtain better results especially for a simple dataset like this, but it’s definitely worth trying. Third, you can also try to use a more complex dataset if you want to challenge yourself.
When it comes to practical applications, there are actually lots of things you can do with diffusion models. The simplest one might be for data augmentation. With diffusion model, we can easily generate new images from a specific data distribution. For example, suppose we are working on an image classification project, but the number of images in the classes are imbalanced. To address this problem, it is possible for us to take the images from the minority class and feed them into a diffusion model. By doing so, we can ask the trained diffusion model to generate a number of samples from that class as many as we want.
And well, that’s pretty much everything about the theory and the implementation of diffusion model. Thanks for reading, I hope you learn something new today!
You can access the code used in this project through this link. Here are also the links to my previous articles about Autoencoder, Variational Autoencoder (VAE), Neural Style Transfer (NST), and Transformer.
References
[1] Jascha Sohl-Dickstein et al. Deep Unsupervised Learning using Nonequilibrium Thermodynamics. Arxiv. https://arxiv.org/pdf/1503.03585 [Accessed December 27, 2024].
[2] Jonathan Ho et al. Denoising Diffusion Probabilistic Models. Arxiv. https://arxiv.org/pdf/2006.11239 [Accessed December 27, 2024].
[3] Image created originally by author.
[4] Olaf Ronneberger et al. U-Net: Convolutional Networks for Biomedical
Image Segmentation. Arxiv. https://arxiv.org/pdf/1505.04597 [Accessed December 27, 2024].
[5] Yann LeCun et al. The MNIST Database of Handwritten Digits. https://yann.lecun.com/exdb/mnist/ [Accessed December 30, 2024] (Creative Commons Attribution-Share Alike 3.0 license).
[6] Ashish Vaswani et al. Attention Is All You Need. Arxiv. https://arxiv.org/pdf/1706.03762 [Accessed September 29, 2024].
The post The Art of Noise appeared first on Towards Data Science.