해당 논문의 paper review : cumulu-s.tistory.com/24

 

2. Auto-Encoding Variational Bayes (VAE) - paper review

두 번째로 다뤄볼 논문은 VAE라는 이름으로 잘 알려진 Auto-Encoding Variational Bayes라는 논문입니다. 너무 유명한 논문이라 다양한 자료들이 많이 있지만, 가급적이면 다른 분들이 가공해서 만든 자료

cumulu-s.tistory.com

 

이번 글에서는 VAE를 실제로 구현한 코드와, 각 코드가 논문에서 어떤 것과 대응되는지를 자세히 알아보겠습니다.

 

 

paper review에서는 수많은 수식들이 나왔지만, 막상 구현에서는 그렇게까지 복잡하지 않습니다.

 

 

그럼 시작해보겠습니다.

 

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from tqdm.auto import tqdm
from torch.nn import functional as F
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import datetime
import os

먼저, 필요한 패키지들을 불러옵니다.

 

이번에는 pytorch에서 지원하는 tensorboard라는 기능을 사용해서 loss function을 plotting 해보고, 생성되는 이미지들을 살펴보려고 합니다.

 

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print("사용하는 Device : ", DEVICE)

current_time = datetime.datetime.now() + datetime.timedelta(hours= 9)
current_time = current_time.strftime('%Y-%m-%d-%H:%M')

saved_loc = os.path.join('/content/drive/MyDrive/VAE_Result', current_time)
os.mkdir(saved_loc)

print("저장 위치: ", saved_loc)

writer = SummaryWriter(saved_loc)
EPOCHS = 50
BATCH_SIZE = 200

저번 GAN 코드에서와 달리, 저장 위치를 지정해주는 코드를 추가했습니다.

 

tensorboard를 사용하면, 코드가 돌아가면서 loss나 이미지가 찍히는데 이를 돌릴때마다 결과를 나눠서 가지고 있어야 하기 때문에 폴더를 별도로 만드는 방법을 사용했습니다.

 

datetime.datetime.now()를 치면 현재 시간이 나오는데, Google Colab의 경우는 한국 시간보다 9시간 느리기 때문에 datetime.timedelta(hours = 9)를 통해서 9시간을 더해 실제 한국 시간과 시차를 맞춰줍니다.

 

그리고 .strftime을 사용하면 원하는 형태로 출력이 되도록 만들 수 있습니다.

 

/content/drive/MyDrive/VAE_Result <= 이거는 제가 사용하는 경로이므로, 원하시는 경로로 바꿔서 지정해주시면 됩니다.

 

os.mkdir는 원하는 경로에 폴더를 생성하는 코드입니다. 이렇게 해야 돌릴 때마다 폴더가 새로 만들어지고, 만들어진 폴더에 tensorboard를 만드는데 필요한 데이터들이 저장되게 됩니다.

 

writer = SummaryWriter 를 이용해서 tensorboard를 써주는 객체를 만들어줍니다.

 

# Transformer code
transformer = transforms.Compose([
            transforms.ToTensor()
])


# Loading trainset, testset and trainloader, testloader
trainset = torchvision.datasets.MNIST(root = '/content/drive/MyDrive/MNIST', train = True,
                                        download = True, transform = transformer)

trainloader = torch.utils.data.DataLoader(trainset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2)


testset = torchvision.datasets.MNIST(root = '/content/drive/MyDrive/MNIST', train = False,
                                        download = True, transform = transformer)

testloader = torch.utils.data.DataLoader(testset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2)

transformer에는 numpy array를 pytorch tensor로 만들어주는 역할을 하는 transforms.ToTensor()만 투입해줍니다.

 

 

# sample check
sample, label = next(iter(trainloader))

# show grid image
def imshow_grid(img):
    img = torchvision.utils.make_grid(img)
    print(type(img))
    print(img.shape)
    plt.imshow(img.permute(1, 2, 0))
    ax = plt.gca()
    ax.axes.xaxis.set_visible(False)
    ax.axes.yaxis.set_visible(False)
    plt.show()


imshow_grid(sample[0:8])

 

해당 코드는 샘플로 8개의 이미지만 grid 형식으로 만들어서 plotting 하는 코드입니다.

 

 

# VAE model
class VAE(nn.Module):
    def __init__(self, image_size, hidden_size_1, hidden_size_2, latent_size):
        super(VAE, self).__init__()

        self.fc1 = nn.Linear(image_size, hidden_size_1)
        self.fc2 = nn.Linear(hidden_size_1, hidden_size_2)
        self.fc31 = nn.Linear(hidden_size_2, latent_size)
        self.fc32 = nn.Linear(hidden_size_2, latent_size)

        self.fc4 = nn.Linear(latent_size, hidden_size_2)
        self.fc5 = nn.Linear(hidden_size_2, hidden_size_1)
        self.fc6 = nn.Linear(hidden_size_1, image_size)

    def encode(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        return self.fc31(h2), self.fc32(h2)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + std * eps

    def decode(self, z):
        h3 = F.relu(self.fc4(z))
        h4 = F.relu(self.fc5(h3))
        return torch.sigmoid(self.fc6(h4))

    def forward(self, x):
        mu, logvar = self.encode(x.view(-1, 784))
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

 

다음으로는, 가장 핵심이 되는 VAE model에 해당하는 class에 대해서 알아보겠습니다.

 

먼저 논문에서 $p_\phi(z|x)$인 Encoder(혹은 Recognition model)부터 보겠습니다.

 

Encoder는 이미지를 받아서, $\mu$와 $\sigma$를 출력해야 합니다.

 

따라서 이 작업이 self.fc1과 self.fc2, self.fc31 / self.fc32를 통해서 이루어집니다.

 

self.fc1는 이미지를 받아서 hidden space_1만큼으로 출력해내고, self.fc2는 hidden space_1만큼의 데이터를 받아서 hidden space_2만큼의 데이터를 뽑아냅니다. self.fc31와 self.fc32는 hidden space_2만큼의 데이터를 받아서 latent space 차원만큼의 $\mu$와 $\sigma$를 출력해냅니다.

 

이 작업은 encode라는 이름의 함수로 만들어 해당 작업이 이루어지게 코드가 짜져있습니다.

 

다음으로는, $\mu$와 $\sigma$가 주어졌을 때, latent variable $z$를 만들어야 합니다.

 

이 작업은 reparameterization이라는 이름으로 논문에서 소개되었고, 코드상에서는 reparameterize라는 함수로 구현되었습니다.

 

근데 잘 보면, 이 함수는 mu와 logvar를 받는다는 것을 확인할 수 있습니다.

 

갑자기 logvar는 무엇을 의미하는 것일까요?

 

이는 표준편차 값이 음수가 되지 않도록 만들기 위해서, encoder에서 나온 self.fc32의 결괏값을 $log\sigma^2$로 생각합니다. 

 

그래서 변수 이름이 logvar(log 분산)인 것이죠.

 

정리해보면, 이렇게 정리할 수 있습니다.

 

$\sigma = e^{log\sigma} = e^{2log\sigma/2} = e^{log\sigma^2/2} = e^{logvar/2}$가 됩니다.

 

이렇게 만들면, $\sigma$의 값이 $e^{logvar/2}$로 구현되기 때문에, 무조건 양수 값이 되게 됩니다. 

 

다시 reparameterize 함수로 돌아가면, 입력으로 받은 mu와 logvar를 가지고,

 

std는 torch.exp(0.5 * logvar)로 만들어주고, eps는 torch.randn_like(std)로 만들어줍니다.

 

torch.randn_like는 입력과 동일한 모양으로 평균이 0이고 분산이 1인 정규분포에서 숫자를 뽑아 tensor를 만들어줍니다.

 

이를 논문에서 나온 것처럼, mu + std * eps로 구현합니다.

 

논문에서 나온 z = mu + std * eps

이것의 결과를 z라는 이름으로 latent vector로 만들어주고, 이것을 decoding 한 결과와 mu, logvar를 출력으로 내보냅니다.

 

self.decode(z)를 해서 나오는 결과는, torch.sigmoid(self.fc6(h4))이므로 이미지의 사이즈와 동일한 차원을 가지면서 sigmoid를 통과해 0부터 1 사이의 값입니다. 

 

이는 우리가 input으로 투입시킨 이미지를 VAE를 통해서 복원한 이미지라고 볼 수 있습니다.

 

VAE_model = VAE(28*28, 512, 256, 2).to(DEVICE)
optimizer = optim.Adam(VAE_model.parameters(), lr = 1e-3)

다음은 VAE 모델을 VAE_model이라는 이름의 객체로 만들어주고, Adam optimizer를 만들어줍니다.

 

hidden space_1의 수는 512로 잡았고, hidden space_2의 수는 256, latent space의 차원은 2차원으로 지정하였습니다.

 

 

def loss_function(recon_x, x, mu, logvar):
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, 784), reduction = 'sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE, KLD

다음 코드는 loss function을 계산하는 코드입니다.

 

먼저 논문에서 나온 loss 값(variational lower bound)을 살펴봅시다.

 

 

첫 번째 term은 KL-Divergence라는 얘기를 했었고, 두 번째 term은 reconstruction error입니다.

 

먼저, 논문의 Appendix B를 통해서 어떻게 이것이 도출되었는지 보겠습니다.

 

prior가 $p_\theta(z) = N(0, I)$를 만족시키고, posterior approximation이 $q_\phi(z|x^{(i)})$가 Gaussian이면 이렇게 풀 수 있다고 합니다.

 

맨 마지막을 보시면, - KL term을 한 결과가 바로 1/2로 시작하는 식임을 확인할 수 있습니다.

 

KL Divergence는 항상 양수이므로, - KL term은 항상 음수가 될 것입니다.

 

loss를 계산하고자 하는 것이므로, 실제 코드에서는 여기에 - 를 곱해주는 식으로 계산한 것으로 보입니다.

 

따라서 코드에서는 - 0.5 * torch.sum(1 + logvar - mu.pow(2) -logvar.exp())로 계산됩니다.

 

 

 

 

두 번째 reconstruction error항은 어떻게 구성되는지 Appendix C를 보면 알 수 있습니다.

 

우리 모델의 경우, decoder를 통해서 나오는 값이 sigmoid activation function을 통과해서 나오기 때문에 Bernoulli distribution을 따른다고 생각할 수 있습니다.

 

논문의 Appendix C에서는, $p_\theta(x|z)$가 multivariate Bernoulli 분포일 때, 다음과 같이 계산하라고 나와있습니다.

 

따라서, 이를 F.binary_cross_entropy()를 이용해서 계산하였습니다.

 

원래 코드에서는 BCE와 KLD를 더하도록 되어 있는데, 저는 BCE와 KLD의 학습 / 테스트 그래프를 그리기 위해서 두 값이 합쳐져서 나오는 게 아니라 별도로 output으로 나오도록 코드를 짰습니다.

 

 

def train(epoch, model, train_loader, optimizer):
    model.train()
    train_loss = 0
    for batch_idx, (data, _) in enumerate(train_loader):
        data = data.to(DEVICE)
        optimizer.zero_grad()

        recon_batch, mu, logvar = model(data)

        BCE, KLD = loss_function(recon_batch, data, mu, logvar)

        loss = BCE + KLD

        writer.add_scalar("Train/Reconstruction Error", BCE.item(), batch_idx + epoch * (len(train_loader.dataset)/BATCH_SIZE) )
        writer.add_scalar("Train/KL-Divergence", KLD.item(), batch_idx + epoch * (len(train_loader.dataset)/BATCH_SIZE) )
        writer.add_scalar("Train/Total Loss" , loss.item(), batch_idx + epoch * (len(train_loader.dataset)/BATCH_SIZE) )

        loss.backward()

        train_loss += loss.item()

        optimizer.step()

        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\t Loss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader),
                loss.item() / len(data)))
            
    print("======> Epoch: {} Average loss: {:.4f}".format(
        epoch, train_loss / len(train_loader.dataset)
    ))        

다음으로는 학습 코드입니다.

 

VAE에서는 label이 필요 없으므로, train_loader에서 나오는 label을 _로 표기해서 별도로 저장하지 않습니다.

 

data를 model에 넣으면, 아까 위에서 봤듯이 reconstruction 된 이미지와 mu, logvar가 나오게 됩니다.

 

그리고 이 값들을 이용해서 loss_function에 투입하여 loss 값을 계산합니다.

 

최종적인 loss 값은 BCE + KLD가 될 것이며, 이를 backpropagation 시켜줍니다.

 

중간에 writer.add_scalar라는 함수가 보이는데, 이는 tensorboard에 값을 저장하기 위해 추가된 코드입니다.

 

첫 번째인 "Train/Reconstruction Error"는 어떤 이름을 가지는 곳에 저장할 것인지를 정하는 것이고

 

두 번째인 BCE.item()은 tensorboard에 저장하려고 하는 값 그 자체를 나타냅니다.

 

그리고 세 번째 인자는 그래프를 그려줄 때 x축에 해당하는 값을 나타냅니다. 

 

batch_idx가 배치의 index이므로, 여기에 epoch이 지날 때마다 추가적으로 더 더해줘서

 

실제로 몇 번째의 batch인지를 x축의 값으로 넣도록 하였습니다.

 

MNIST의 경우, train data가 6만 장이므로, batch size = 200 기준으로는 1 epoch 당 300번이 그래프에 찍히게 됩니다.

 

해당 코드에서는 학습을 50 epochs 만큼 진행하므로, train 데이터에서는 x축의 값이 0부터 15000까지 찍힐 것입니다.

 

 

def test(epoch, model, test_loader):
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for batch_idx, (data, _) in enumerate(test_loader):
            data = data.to(DEVICE)
            
            recon_batch, mu, logvar = model(data)
            BCE, KLD = loss_function(recon_batch, data, mu, logvar)

            loss = BCE + KLD

            writer.add_scalar("Test/Reconstruction Error", BCE.item(), batch_idx + epoch * (len(test_loader.dataset)/BATCH_SIZE) )
            writer.add_scalar("Test/KL-Divergence", KLD.item(), batch_idx + epoch * (len(test_loader.dataset)/BATCH_SIZE) )
            writer.add_scalar("Test/Total Loss" , loss.item(), batch_idx + epoch * (len(test_loader.dataset)/BATCH_SIZE) )
            test_loss += loss.item()

            if batch_idx == 0:
                n = min(data.size(0), 8)
                comparison = torch.cat([data[:n], recon_batch.view(BATCH_SIZE, 1, 28, 28)[:n]]) # (16, 1, 28, 28)
                grid = torchvision.utils.make_grid(comparison.cpu()) # (3, 62, 242)
                writer.add_image("Test image - Above: Real data, below: reconstruction data", grid, epoch)

다음으로는 test 코드입니다.

 

train과 다른 지점은 if batch_idx == 0:에 해당하는 부분일 것 같습니다.

 

torch.cat을 통해서 test_loader에서 나오는 데이터 8개와, 이를 VAE 모델을 통해서 뽑아낸 reconstruction 결과 8개를 합쳐줍니다. 이는 (16, 1, 28, 28)로 나오게 됩니다.

 

그리고 이를 표현하기 위해서 make_grid 함수를 이용해서 grid 이미지를 만들어주고, 이것을 writer.add_image를 통해 tensorboard에 그려줍니다.

 

def latent_to_image(epoch, model):
    with torch.no_grad():
        sample = torch.randn(64, 2).to(DEVICE)
        recon_image = model.decode(sample).cpu()
        grid = torchvision.utils.make_grid(recon_image.view(64, 1, 28, 28))
        writer.add_image("Latent To Image", grid, epoch)

 

latent_to_image는 latent variable을 이미지로 만들어주는 함수입니다.

 

여기에서는 gradient를 계산하지 않으므로 torch.no_grad()를 먼저 선언해줍니다.

 

torch.randn(64, 2)으로 64개의 2차원짜리 랜덤 값들을 만들어냅니다.

 

그리고 이 값을 model.decode를 통해서 reconstruction 이미지를 만들어줍니다.

 

reconstruction 된 이미지를 grid 이미지로 만들어주고, 이를 또다시 writer.add_image를 사용해서 tensorboard에 그려줍니다.

 

해당 함수를 통해서 평균 0, 분산 1인 Gaussian distribution을 가지는 데이터가 주어졌을 때, 어떻게 이미지로 복원하는지를 확인할 수 있습니다.

 

논문에서 Decoder라고 부르던 $p_\theta(x|z)$ 부분이 잘 기능하는지를 확인해보는 것이라고 보시면 되겠습니다.

 

for epoch in tqdm(range(0, EPOCHS)):
    train(epoch, VAE_model, trainloader, optimizer)
    test(epoch, VAE_model, testloader)
    print("\n")
    latent_to_image(epoch, VAE_model)

writer.close()

얼마나 남았는지 확인해주기 위해서 tqdm을 달아주고, range(0, EPOCHS)를 지정해서 총 50 epochs 동안 학습이 진행되도록 해줍니다.

 

train (VAE 모델 학습) => test (VAE 모델이 잘 복원하는지를 확인) => latent_to_image (잠재 변수가 주어졌을 때 image generation을 잘하는지 확인)을 해주고, epoch을 다 돌면 writer.close()를 통해 tensorboard에 써주는 writer를 닫아줍니다.

 

 

%load_ext tensorboard

%tensorboard --logdir='/content/drive/MyDrive/VAE_Result/2021-03-15-15:39'

해당 코드는 Google colab에서 사용할 때 적용하는 코드입니다.

 

그리고 해당 경로는 제가 돌린 결과를 저장한 폴더의 위치라고 보시면 되겠습니다.

 

다음으로는 해당 코드를 돌렸을 때 얻게 되는 결과들에 대해서 살펴보겠습니다.

 

먼저 Train입니다.

 

Total Loss = KL-Divergence + Reconstruction Error로 계산되며, Reconstruction Error는 감소하고 있고 KL-Divergence는 계속해서 증가하는 모습을 보이고 있습니다.

 

Total Loss 관점에서는 계속해서 떨어지고 있네요.

 

Reconstruction Error가 감소하는 것을 보면, 기존 이미지를 잘 구현해나가고 있구나 라고 생각해볼 수 있습니다.

 

하지만, KL-Divergence가 계속해서 증가하는 모습을 보이고 있는데, 이는 $q_\phi(z|x^{(i)})$가 $p_\theta(z)$와 가까워지지 못하고 있다는 사실을 알 수 있습니다. 

 

논문에서 KL-Divergence term이 결국 $\phi$를 regularization 하는 역할이라고 했었는데, Encoder의 parameter들이 적절하게 학습되지 못하고 있는 것 같습니다.

 

 

Test에서도 크게 다르지 않은 모습을 보이고 있습니다.

 

여전히 KL-Divergence는 증가하고 있고, Reconstruction Error는 감소하여 Total Loss는 감소하고 있는 모습을 보입니다.

 

 

다음으로는 이미지들을 살펴보겠습니다.

 

 

위에 3가지 사진은, Test image가 주어졌을 때 실제 Test image 8개와(위쪽) 이를 model의 input으로 투입했을 때 얻게 되는 reconstruction image 8개를 보여줍니다.

 

 

첫 번째는 0 epoch일 때, 두 번째는 23 epoch일 때, 세 번째는 49 epoch일 때입니다. (사진에서 보이는 오렌지색 동그라미를 움직이면 해당하는 epoch의 이미지를 볼 수 있습니다. interactive 한 화면이라서 처음 중간 끝 정도의 3가지만 보여드립니다.) 

 

 

생각보다는 엄청 깔끔하게 생성하는 것 같지는 않습니다.

 

 

다음으로는 latent variable이 제공되었을 때, 어떻게 이미지를 generation 하는지를 살펴보겠습니다.

 

 

맨 처음 단계에서는 알 수 없는 이미지들이 대부분이지만, 갈수록 원래 데이터에서 보일법한 이미지들이 뚜렷하게 보입니다.

 

엄청 깔끔하진 않지만, 대부분 조금 blur하나 그래도 이게 어떤 숫자인지 인지할 수 있을 정도의 퀄리티는 나오고 있네요.

 

 

위에서 실행했던 코드들을 보고 싶으시다면 아래 주소의 제 깃헙으로 와주시면 됩니다.

 

 

github.com/PeterKim1/paper_code_review

 

 

 

여기까지 Auto-Encoding Variational Bayes(VAE)의 code review를 마치겠습니다.

 

 

감사합니다.

 

 

 

 

 

 

 

 

 

 

+ Recent posts