이번 글에서는 지난 글에서 review한 AAE 논문을 코드로 구현한 내용을 살펴보겠습니다.

 

 

AAE 논문의 paper review : cumulu-s.tistory.com/26

 

3. Adversarial Autoencoders(AAE) - paper review

오늘은 Autoencoder의 구조와 GAN framework를 결합해 탄생한 AAE에 대해서 알아보도록 하겠습니다. paper : arxiv.org/abs/1511.05644 Adversarial Autoencoders In this paper, we propose the "adversarial aut..

cumulu-s.tistory.com

 

AAE 논문에서는, AAE를 활용할 수 있는 다양한 applications을 제시하였는데 일단 저는 가장 기초적인 AAE를 짜봤습니다.

 

 

hyperparameter도 바꿔보면서, 수십번의 시도를 하였음에도 논문에서 나온대로 Encoder가 manifold learning을 하지 못해서 코드가 완벽하다고 하기는 어려울 것 같습니다. (GAN에서 discriminator와 generator가 적절한 수준에서 학습시키기가 어려운 것 때문으로 추측하고 있습니다. 이럴땐 공식 코드가 있고 없고의 차이가 크다고 느껴지네요 ㅠㅠ)

 

 

대략 이런식으로 코드를 짜보면 되겠구나 정도로 참고해주시면 되겠습니다.

 

 

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.datasets as dsets
import torchvision.transforms as transforms
from torch.autograd import Variable
from torch.utils.tensorboard import SummaryWriter
import datetime
import os
import torchvision
import itertools
import numpy as np
from math import sin,cos,sqrt

먼저 필요한 package들을 import 해줍니다.

 

 

current_time = datetime.datetime.now() + datetime.timedelta(hours= 9)
current_time = current_time.strftime('%Y-%m-%d-%H:%M')

saved_loc = os.path.join('/content/drive/MyDrive/AAE_Result', current_time)
os.mkdir(saved_loc)

print("저장 위치: ", saved_loc)

writer = SummaryWriter(saved_loc)

 

이 부분은 이전 코드 리뷰에서도 이미 다뤘던 부분이라서, 넘어가도록 하겠습니다.

 

직접 돌리실 때는 경로를 원하시는 위치로 바꿔주시면 됩니다.

 

 

# MNIST Dataset 
dataset = dsets.MNIST(root='/content/drive/MyDrive/MNIST', 
                      train=True, 
                      transform=transforms.ToTensor(),  
                      download=True)

# Data Loader (Input Pipeline)
data_loader = torch.utils.data.DataLoader(dataset=dataset, 
                                          batch_size=200, 
                                          shuffle=True)

testset = dsets.MNIST(root='/content/drive/MyDrive/MNIST', 
                      train=False, 
                      transform=transforms.ToTensor(),  
                      download=True)

testloader = torch.utils.data.DataLoader(dataset=testset,
                                         batch_size = 200,
                                         shuffle = True)

사용할 데이터셋인 MNIST를 불러와줍니다.

 

 

 

def to_var(x):
    if torch.cuda.is_available():
        x = x.cuda()
    return Variable(x) 

이 함수는, x 라는 변수를 GPU 사용이 가능할 때, cuda로 옮겨주고 이를 Variable로 만들어주는 코드입니다.

 

 

#Encoder
class Q_net(nn.Module):  
    def __init__(self,X_dim,N,z_dim):
        super(Q_net, self).__init__()
        self.lin1 = nn.Linear(X_dim, N)
        self.lin2 = nn.Linear(N, N)
        self.lin3gauss = nn.Linear(N, z_dim)

        self.lin1.weight.data.normal_(0, 0.01)
        self.lin2.weight.data.normal_(0, 0.01)
        self.lin3gauss.weight.data.normal_(0, 0.01)
    def forward(self, x):
        x = F.dropout(self.lin1(x), p=0.25, training=self.training)
        x = F.relu(x)
        x = F.dropout(self.lin2(x), p=0.25, training=self.training)
        x = F.relu(x)
        xgauss = self.lin3gauss(x)
        return xgauss

다음으로는 Encoder 부분입니다. X_dim은 이미지의 사이즈가 될 것이고, N은 중간 hidden layer의 node 수 입니다. z_dim는 latent space의 차원이구요.

 

논문의 Appendix A에서 'The weights are initialized with a Gaussian distribution with the standard deviation of 0.01.'라고 적혀 있으므로, weight.data.normal_(0, 0.01)을 이용해서 초기화를 해줍니다.

 

그리고 'The activation of the last lyaer of $q(z|x)$ is linear'라고 적혀 있으므로, 마지막 layer에는 activation function을 따로 만들지 않습니다.

 

# Decoder
class P_net(nn.Module):  
    def __init__(self,X_dim,N,z_dim):
        super(P_net, self).__init__()
        self.lin1 = nn.Linear(z_dim, N)
        self.lin2 = nn.Linear(N, N)
        self.lin3 = nn.Linear(N, X_dim)

        self.lin1.weight.data.normal_(0, 0.01)
        self.lin2.weight.data.normal_(0, 0.01)
        self.lin3.weight.data.normal_(0, 0.01)

    def forward(self, x):
        x = F.dropout(self.lin1(x), p=0.25, training=self.training)
        x = F.relu(x)
        x = F.dropout(self.lin2(x), p=0.25, training=self.training)
        x = F.relu(x)
        x = self.lin3(x)
        return torch.sigmoid(x)

 

다음은 Decoder 입니다. Decoder도 동일하게 weight 초기화를 적용해주고, 이번에는 마지막 layer의 결과가 이미지와 동일하게 0부터 1사이의 값이 나와야하므로 sigmoid를 마지막에 적용해줍니다.

 

 

# Discriminator
class D_net_gauss(nn.Module):  
    def __init__(self,N,z_dim):
        super(D_net_gauss, self).__init__()
        self.lin1 = nn.Linear(z_dim, N)
        self.lin2 = nn.Linear(N, N)
        self.lin3 = nn.Linear(N, 1)

        self.lin1.weight.data.normal_(0, 0.01)
        self.lin2.weight.data.normal_(0, 0.01)
        self.lin3.weight.data.normal_(0, 0.01)
    def forward(self, x):
        x = F.dropout(self.lin1(x), p=0.2, training=self.training)
        x = F.relu(x)
        x = F.dropout(self.lin2(x), p=0.2, training=self.training)
        x = F.relu(x)
        return torch.sigmoid(self.lin3(x))  

 

다음 코드는 Discriminator입니다. latent vector를 입력으로 받아, 해당 latent vector가 진짜인지 가짜인지 판별하는 역할을 하죠. 

 

 

 

EPS = 1e-15
z_red_dims = 2
Q = Q_net(784,1000,z_red_dims).cuda()
P = P_net(784,1000,z_red_dims).cuda()
D_gauss = D_net_gauss(1000,z_red_dims).cuda()

EPS는 매우 작은 숫자로 설정되어 있고, latent dimension은 2로 설정했습니다.

 

그리고 논문의 Appendix A에 'The encoder, decoder and discriminator each have two layers of 1000 hidden units with ReLU activation function'으로 적혀 있으므로, hidden layer의 node는 1000개로 지정하였습니다.

 

 

# Set learning rates
gen_lr = 0.0001
reg_lr = 0.00005

# encode/decode optimizers
optim_P = torch.optim.Adam(P.parameters(), lr=gen_lr)
optim_Q_enc = torch.optim.Adam(Q.parameters(), lr=gen_lr)
# regularizing optimizers
optim_Q_gen = torch.optim.Adam(Q.parameters(), lr=reg_lr) 
optim_D = torch.optim.Adam(D_gauss.parameters(), lr=reg_lr)
    
data_iter = iter(data_loader)
iter_per_epoch = len(data_loader)
total_step = 60000 

Reconstruction error를 최소화하는 상황에서 Encoder와 Decoder는 0.0001의 learning rate를 가지고 학습하도록 설계하였으며, Regularization error를 최소화하는 상황에서 Discriminator와 generator는 0.00005의 learning rate를 가지고 학습하도록 설계하였습니다.

 

data_iter라는 이름으로 data_loader를 iterator로 만들어주고, 전체 training step은 6만번 진행합니다. (batch size가 100개라서, 600번 진행하면 1 epoch이며, 6만번 진행하면 100 epoch이 됩니다.)

 

iter_per_epoch는 600이 나옵니다.

 

 

for step in range(total_step):

    # Reset the data_iter
    if (step+1) % iter_per_epoch == 0:
        data_iter = iter(data_loader)

    # Fetch the images and labels and convert them to variables
    images, labels = next(data_iter)
    images, labels = to_var(images.view(images.size(0), -1)), to_var(labels)

 

다음으로는 반복문 부분을 보겠습니다.

 

앞에서 iterator를 정의했었는데, 만약 600번(1 에폭)이 지나면 새롭게 다시 iterator를 reset 해줍니다.

 

그리고 iterator는 next()라는 명령어를 통해서 한개씩 데이터를 빼낼 수 있기 때문에 next(data_iter)를 적용해줍니다.

 

next를 통해서 빼넨 이미지는 (100, 1, 28, 28)의 형태를 가지고 있는데 앞에서 정의한 Encoder는 (Batch_size, 28*28)의 형태를 input으로 받습니다. 따라서, images.view(images.size(0), -1)를 통해서 (100, 784) 사이즈로 맞춰줍니다.

 

딥러닝에서는 항상 shape가 매우 중요하니까, 코드를 보실 때 이런 부분을 염두에 두시고 보시는게 좋다고 생각합니다.

 

 

    # ======================================== #
    # =====Phase 1 : Reconstruction Loss====== #
    # ======================================== #
    
    P.zero_grad()
    Q.zero_grad()
    D_gauss.zero_grad()

    z_sample = Q(images)   #encode to z
    X_sample = P(z_sample) #decode to X reconstruction
    recon_loss = F.binary_cross_entropy(X_sample+EPS, images+EPS, reduction = 'sum')

    writer.add_scalar("Train/Reconstruction_loss", recon_loss.item() / len(images), step)

    recon_loss.backward()
    optim_P.step()
    optim_Q_enc.step()

다음으로는 Loss를 계산하고, backprop 시켜보겠습니다.

 

먼저, Encoder와 Decoder, Discriminator를 모두 zero_grad() 해줍니다.

 

z_sample은 이미지를 latent vector로 바꾼 것을 저장한 것이고, X_sample은 latent vector를 이용해 이미지를 복원한 결과입니다.

 

그리고 recon_loss는 X_sample과 원래 이미지를 binary_cross_entropy를 이용해서 loss 값을 계산해줍니다.

 

writer.add_scalar는 reconstruction loss 값을 tensorboard에 write하는 코드구요.

 

계산된 recon_loss에 대해서 .backward()를 해주면 loss에 대한 각 변수들의 gradient가 계산됩니다. 

 

그리고 나서 optim.step()을 통해서 optimizer에 지정된 모델의 매개변수들을 계산된 gradient를 가지고 업데이트 해줍니다.

 

 

    # ======================================== #
    # =====Phase 2 : Discriminator Loss======= #
    # ======================================== #

    Q.eval()
    z_real_gauss = Variable(torch.randn(images.size()[0], z_red_dims) * 5.).cuda()
    D_real_gauss = D_gauss(z_real_gauss)

    z_fake_gauss = Q(images)
    D_fake_gauss = D_gauss(z_fake_gauss)

    D_loss = -torch.mean(torch.log(D_real_gauss + EPS) + torch.log(1 - D_fake_gauss + EPS))

    writer.add_scalar("Train/Discriminator_loss", D_loss.item() / len(images), step)

    D_loss.backward()
    optim_D.step()

 

다음 단계는 Discriminator loss를 계산하고, 업데이트 하는 코드입니다.

 

먼저, Q(Encoder)를 추론 모드로 전환해줍니다. (Q.eval()을 하면 해당 모델의 파라미터가 업데이트 되지 않습니다.)

 

z_real_gauss라는 이름으로 평균이 0이고 표준편차가 5인 가우시안 분포에서 임의의 값들을 만들어냅니다.

 

torch.randn(images.size()[0], z_red_dims)를 이용하면 평균이 0이고 표준편차가 1짜리인 (100, 2) shape를 가지는 값들을 만들게 되는데, 여기에 5.를 곱해서 표준편차가 5가 되도록 만든 코드라고 보시면 됩니다.

 

이 가우시안 분포 값을 Discriminator에 넣어서 나온 결과를 D_real_gauss로 저장합니다. 

 

다음으로는 우리가 가지고 있는 training image를 Encoder에 넣어서 나온 결과를 z_fake_gauss로 저장하고, 이를 Discriminator에 넣어서 나온 결과를 D_fake_gauss로 저장합니다. 

 

그리고나서, GAN의 Discriminator loss 계산 방법을 그대로 적용하여 D_loss를 계산해줍니다. 

 

참고로, 여기서 EPS를 각각 더해주는데, 궁금해서 이걸 빼보니까 에러가 나더라고요. 확실하진 않은데 이미지가 모두 0이여 버리면 gradient 연산을 해줄 때 문제가 발생하나봅니다. 그래서 엄청나게 작은 양수를 넣어줘서 그런 연산 이슈를 해결하려고 EPS라는 값을 넣어주는 것 같습니다. 1e-15면 엄청나게 작은 값이라서 실제 연산 결과에는 영향을 미치지 않을 정도죠.

 

이번 코드에서는 Discriminator만 학습을 시켜주면 되기 때문에 D_loss.backward()를 해주고 optim_D.step()을 해줍니다.

 

 

 

    # ======================================== #
    # =======Phase 3 : Generator Loss========= #
    # ======================================== #

    Q.train()

    z_fake_gauss = Q(images)
    D_fake_gauss = D_gauss(z_fake_gauss)
    
    G_loss = - torch.mean(torch.log(D_fake_gauss + EPS))

    G_loss.backward()
    optim_Q_gen.step()

    writer.add_scalar("Train/Generator_loss", G_loss.item() / len(images), step)

마지막으로 Generator loss를 계산해봅니다.

 

이번에는 encoder이자 generator인 Q를 학습해야 하므로, Q.train()으로 학습 모드를 만들어줍니다.

 

그리고 training image를 encoder에 넣어서 latent vector로 만들어주고, 이를 Discriminator에 넣어서 계산해줍니다.

 

GAN에서 얘기한 Generator loss를 동일하게 적용해주면 완성이 됩니다.

 

 

 if (step+1) % 100 == 0:
        print('Step [%d/%d], Recon_Loss: %.4f, D_loss: %.4f, G_loss: %.4f' 
              %(step+1, total_step, recon_loss.item() / len(images), D_loss.item() / len(images), G_loss.item() / len(images)))

학습이 잘 되고 있는지 확인해야 하니까, step 100번마다 reconstruction loss와 discriminator loss, generator loss가 찍히도록 짰습니다.

 

 

 

if (step+1) % 300 == 0:

        # Test the model's ability to reconstruct test set image. 
        
        Q.eval()
        P.eval()

        sample, _ = next(iter(testloader))

        sample = to_var(sample.view(sample.size(0), -1)) # (100, 784)

        z_ = Q(sample) # (100, 120)
        recon_ = P(z_) # (100, 784)

        n = min(sample.size(0), 8)
        comparison = torch.cat([sample.view(100, 1, 28, 28)[:n], recon_.view(100, 1, 28, 28)[:n]])
        grid = torchvision.utils.make_grid(comparison.cpu())
        writer.add_image("Test image - Above: Real data, below: reconstruction data", grid, (step+1)/300 )
        
        # Test the model's ability to decode latent variable to image data
        
        z_test = Variable(torch.randn(16, z_red_dims) * 5.).cuda()
        recon_image = P(z_test).cpu()
        grid = torchvision.utils.make_grid(recon_image.view(16, 1, 28, 28))
        writer.add_image("Latent to Image", grid, (step+1)/300 )

그리고 300번마다 제대로 잘 학습하고 있는지를 파악하기 위해서 다음과 같은 코드를 짰습니다.

 

먼저, Encoder와 Decoder는 inference mode로 바꿔주고, testloader에서 샘플을 받아옵니다.

 

즉, 모델이 아직 본적 없는 이미지들에 대해서 시험을 해보는 것이죠.

 

학습된 Q(Encoder)에 test 이미지를 넣어서 latent vector로 만들어줍니다. 이를 z_로 저장해주고요.

 

그리고 이 z_를 가지고 Decoder에 집어 넣어서 원래 이미지를 복원해줍니다. 이를 recon_으로 저장해줍니다.

 

다음으로는 원래 기존 test image 중에서 8개, 그리고 복원된 이미지 중에서 8개를 가져온 다음 이를 torch.cat을 통해서 concat 해줍니다. sample.view(100, 1, 28, 28)[:n]를 하면 (8, 1, 28, 28)가 될 것이고, recon_.view도 마찬가지로 (8, 1, 28, 28)가 됩니다.

 

여기에 torch.cat을 사용해주면 (16, 1, 28, 28)짜리 이미지가 만들어질 것이고, 이를 torchvision의 make_grid를 통해서 그리드 이미지로 만들어줍니다. 이 때, comparison은 모두 cuda에 올라가 있으므로, cpu()를 통해서 다시 cpu쪽으로 이동시켜줍니다. (GPU를 활용한 cuda 연산을 할 때는 값들이 cuda에 올라가 있는지 cpu에 있는지를 잘 확인해야 합니다.)

 

이를 writer.add_image를 통해서 tensorboard레 써줍니다.

 

그리고 z_test라는 이름으로 평균이 0이고 표준편차가 5짜리로 (16, 2) 차원의 랜덤한 값들을 만들어주고,

 

이를 Decoder에 넣어서 복원해줍니다. recon_image라고 저장해뒀죠.

 

그리고 이를 (16, 1, 28, 28)로 shape를 변경하고, 다시 한번 make_grid를 이용해서 그리드 이미지로 만들어줍니다.

 

이를 tensorboard에 똑같이 저장해주죠.

 

 

해당 코드를 통해서 보고자 하는 것은 다음과 같습니다.

 

1) 모델이 내가 본 적 없는 테스트 이미지를 이미지 => Encoder => Decoder => 복원 과정을 거치면서 제대로 복원 시킬 수 있는 능력을 갖추고 있는가?

 

2) 임의의 $N(0, 5)$를 따르는 값들을 만들어서 Decoder에 투입하였을 때, 적절한 형태의 이미지를 만들어낼 수 있는가? 

 

 

# save the Model
torch.save(Q.state_dict(), os.path.join(saved_loc, 'Q_encoder_weights.pt'))
torch.save(P.state_dict(), os.path.join(saved_loc, 'P_weights.pt'))
torch.save(D_gauss.state_dict(), os.path.join(saved_loc, 'D_weights.pt'))

마지막은 그냥 모델 weight 저장하는 코드니까, 별도의 설명 없이 넘어가겠습니다. 

 

 

여기까지가 코드이고, 다음으로는 결과를 보여드리겠습니다.

 

 

 

Train 과정에서의 Reconstruction error와 Discriminator loss, Generator loss입니다.

 

Reconstruction error는 계속해서 줄어들고 있는 모습을 보이고 있구요.

 

Discriminator loss는 우상향, Generator loss는 우하향 하고 있는 모습을 보이고 있습니다.

 

Generator loss가 $-log(D(G(z)))$ 임을 감안한다면, 이것이 감소한다는 의미는 $log(D(G(z)))$가 증가한다는 의미이고

 

Log 그래프를 생각해보시면, 이것이 증가한다는 의미는 Discriminator가 $G(z)$를 더욱 진짜라고 판단한다 라고 볼 수 있습니다.

 

 

다음으로는, Test를 보겠습니다.

 

step 1200 일 때의 결과 - latent dim : 2

아까 위에서 설명드린대로, 이는 (16, 2) 차원의 Gaussian random vector를 가지고 Decoder를 이용해 복원한 결과입니다.

 

완전 초창기 결과라서 성능이 매우 좋지 않음을 확인할 수 있습니다.

 

 

step 60000 일 때의 결과 - latent dim : 2

 

다음은 100 epoch, 즉 step 60000일 때(학습 완료)의 결과입니다. 아까보다는 조금 더 진짜 같은 이미지들이 나오고 있기는 하지만, 생각보다는 성능이 썩 좋지 않네요.

 

 

다음으로는 복원 성능을 보겠습니다.

 

 

step 1200 일 때의 복원 성능 - latent dim : 2

 

완전 초창기라서 사실상 이미지를 제대로 복원하지 못하고 있음을 확인할 수 있습니다.

 

 

 

step 60000 일 때의 복원 성능 - latent dim : 2

 

 

100 에폭 동안 학습하고 난 결과 입니다. 9가 3처럼 나오는 등 살짝 문제가 있기는 하지만, 그래도 훨씬 더 복원 성능이 좋아졌습니다.

 

 

 

다음으로는, latent dimension을 15로 했을 때의 결과를 보여드리겠습니다.

 

 

 

이전보다 Generator 그래프와 Reconstruction 그래프는 훨씬 깔끔해진 것을 확인할 수 있었습니다.

 

 

반면에, Discriminator의 그래프가 영 심상치 않네요.

 

 

다음으로는, Test도 보겠습니다.

 

 

step 1200 일 때의 결과 - latent dim: 15

 

역시나 학습 초기라 멀쩡하지 못한 이미지가 나오고 있습니다.

 

 

step 60000 일 때의 결과 - latent dim : 15

 

뭔가 글씨의 모습이 썩 좋지 못하게 나왔습니다. 오히려 잠재 공간이 2차원일 때 보다 더욱 안 좋아진 것 같습니다.

 

 

다음으로는 복원 성능도 보겠습니다.

 

 

step 1200 일 때의 결과 - latent dim : 15

 

step 60000 일 때의 결과 - latent dim : 15

 

latent 차원이 15차원일 때, 복원 성능은 엄청나게 좋아진 것을 확인할 수 있습니다.

 

Decoder 성능이 매우 안 좋게 나왔음에도 15차원의 결과를 여기에 담게 된 이유가 바로 복원 성능 때문인데요.

 

latent dimension이 2일 때는 100 epoch 정도 돌면 대략 reconstruction error가 110 ~ 120 정도가 나옵니다.

 

하지만 latent dimension이 15일 때는 100 epoch정도 돌면 대략 reconstruction error가 65 ~ 70 정도가 나옵니다.

 

이를 통해 latent dimension이 증가할 수록 reconstruction error가 좋아질 수 있다는 사실을 확인할 수 있었습니다.

 

다만, Decoder의 성능이 안 좋아지는 문제가 있어서 약간 아쉬움이 남습니다.

 

 

 

마지막으로, latent vector의 값을 (-10, -10)부터 (10, 10)까지 균등하게 이동시키면서 복원했을 때 숫자가 어떻게 변하는지를 나타내는 그림을 보여드리고 마무리 지으려고 합니다.

 

 

 

해당 그림을 그리는 코드는, 제 Github에 올라가 있으니 확인해보시면 되겠습니다.

 

(모델과는 크게 관련이 없어 따로 해당 포스팅에서는 다루지 않습니다.)

 

 

 

개인적으로는 굉장히 관심이 있었던 모델임에도 불구하고, 생각보다 논문에 나온 것 처럼 깔끔한 성능을 내기가 너무 어려웠다는 점이 아쉬웠습니다.

 

 

모델의 구조를 이해하거나 이런 부분이 어려운건 아니였지만, 막상 구현했을 때 논문에 나온 성능이 나와야 의미가 있는 것이겠죠.

 

 

생성되는 것을 봐서는 모델 구조를 잘못 짰다거나 하진 않은 것 같은데, 쉽지 않네요 ㅠㅠ

 

 

추후에 이 문제를 해결할 수 있는 방법을 알게 된다면 해결해보도록 하겠습니다.

 

 

해당 포스팅에 사용된 코드는 다음 Github 주소에 올라가 있습니다.

 

github.com/PeterKim1/paper_code_review

 

 

+ Recent posts