안녕하세요.

 

이번 글에서는 ResNet의 code를 리뷰해보겠습니다.

 

ResNet 논문에 대한 내용은 이전 글에서 확인하실 수 있습니다.

 

cumulu-s.tistory.com/33

 

6. Deep Residual Learning for Image Recognition(ResNet) - paper review

안녕하세요. 오늘은 Deep Residual Learning for Image Recognition(ResNet) 논문에 대해서 정리해보겠습니다. 논문 주소: arxiv.org/abs/1512.03385 Deep Residual Learning for Image Recognition Deeper neural..

cumulu-s.tistory.com

 

code review 시작해보겠습니다.

 

 

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
from tqdm.auto import tqdm
import torchvision
import torchvision.transforms as transforms
import datetime
import os
from torch.utils.tensorboard import SummaryWriter
import shutil

먼저, 필요한 package들을 가져옵니다.

 

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Data
print('==> Preparing data..')
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(
    root='/content/drive/MyDrive/CIFAR10', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=128, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(
    root='/content/drive/MyDrive/CIFAR10', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=100, shuffle=False, num_workers=2)

current_time = datetime.datetime.now() + datetime.timedelta(hours= 9)
current_time = current_time.strftime('%Y-%m-%d-%H:%M')

saved_loc = os.path.join('/content/drive/MyDrive/ResNet_Result', current_time)
if os.path.exists(saved_loc):
    shutil.rmtree(saved_loc)
os.mkdir(saved_loc)

print("결과 저장 위치: ", saved_loc)

writer = SummaryWriter(saved_loc)

사용할 데이터셋을 가져오는 코드들입니다.

 

 

 

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

첫 번째로 볼 부분은, ResNet에서 사용하는 BasicBlock입니다.

 

논문에서 ResNet18과 ResNet34에 사용되는 Building Block이죠.

 

in_planes와 planes, stride라는 변수를 받아서 생성되게 되며

 

3x3 conv - BN - 3x3 conv - BN - shortcut으로 구성되는 코드라고 보시면 됩니다.

 

논문에서는 다음 그림으로 표현되었죠.

 

 

self.shortcut = nn.Sequential()은 연결만 해주는 코드로, input을 그대로 연결해주는 역할을 합니다.

 

if문은 stride가 1이 아니거나 in_planes != self.expansion*planes인 경우(즉, input으로 들어온 차원과 output에 해당하는 차원이 다른 경우)에 해당하게 되는데, 이 때는 1x1 conv를 진행하도록 해서 차원이 일치하도록 만들어줍니다. 

 

 

만약 input이 32x32x64이고 stride가 2로 설정되어 있는 경우의 shape를 나타낸 그림입니다.

 

k는 kernel size를 의미하고, s는 stride, p는 padding을 의미합니다.

 

input인 32x32x64이었으나, layer가 거치면서 16x16x128가 되었으므로 input을 그대로 더해줄 수 없게 됩니다.

 

따라서, 1x1 conv를 이용해서 channel 수를 맞춰주면서 동시에 stride = 2를 적용하여 가로, 세로 길이도 /2 해서 16x16x128를 맞춰주게 되면 shortcut을 더해줄 수 있게 되는 것입니다.

 

 

input의 shape와 layer를 통과하고 나서의 shape가 동일한 경우는, 다음과 같이 살펴볼 수 있습니다.

 

 

앞에서도 언급했지만, nn.Sequential()은 말 그대로 input 값을 그대로 가지고 있다고 생각하시면 됩니다.

 

BasicBlock에 대한 설명은 이쯤이면 충분한 것 같아 다음 block으로 넘어가겠습니다.

 

class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

 

다음에 설명할 코드는 Bottleneck 코드입니다.

 

ResNet의 layer가 50층 이상 넘어갈 경우, 연산량을 줄여주기 위해 기존의 3x3 conv를 두 개 쌓아서 만든 layer를 사용하는 것이 아닌, 1x1 conv - 3x3 conv - 1x1 conv를 이용하는 방식을 사용합니다.

 

이번에도 if문에 해당하는 경우 어떻게 흘러가는지를 설명드리도록 하겠습니다.

 

input이 32x32x256인 경우의 상황을 보여주고 있는 그림입니다.

 

먼저, 1x1 conv을 이용해서 차원을 128차원으로 줄여줍니다. 

 

그러고 나서, 3x3 conv 연산을 진행하고, 1x1 conv 연산을 통해서 다시 512차원으로 늘려주게 됩니다.

 

input이 32x32x256이었으므로, shortcut은 input을 그대로 전달해주면 안 됩니다.

 

따라서 Basicblock과 마찬가지로 1x1 conv을 통해서 차원을 맞춰주면서 동시에 stride = 2를 적용해 가로, 세로 사이즈를 /2 해줍니다.

 

다음으로는 stride가 1인 경우에 대해서 보여드리겠습니다.

 

 

stride가 2로 되어서 가로, 세로가 /2이 되는 경우가 아니라면, channel의 수만 중간에 한번 변경하게 됩니다.

 

이는 3x3 conv를 적용할 때 더 적은 차원의 데이터만 적용하도록 만들어서 연산량을 줄이기 위함입니다.

 

이 경우에는 input과 layer를 통과한 output의 shape가 같으므로, nn.Sequential()을 이용해서 shortcut을 진행해주면 됩니다.

 

 

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

 

다음으로는 ResNet class입니다. 

 

논문 리뷰할 때 ResNet 전체 architecture 표에서는 맨 처음 conv가 7x7로 되어 있었는데, 논문의 CIFAR-10 실험한 부분을 보면 CIFAR-10의 데이터가 32x32x3 임을 감안해서 architecture가 살짝 변경되었음을 확인할 수 있습니다.

 

따라서, 해당 코드는 CIFAR-10에 대해 실험을 진행했기 때문에 구조가 살짝 다릅니다. (핵심적인 ResNet 구조는 동일).

 

ResNet을 만들 때는 block과 num_blocks를 입력으로 받습니다. (def __init__의 parameter를 보시면 됩니다.)

 

맨 처음에 3x3 conv - BN을 적용해주고, 그 이후에는 _make_layer 함수를 이용해서 block이 포함된 layer들을 만들어줍니다.

 

여기서 block이란, 앞에서 제가 설명드린 BasicBlock이나 Bottleneck block을 의미합니다. (앞에서 input으로 block을 받는다고 얘기했었죠?)

 

함수가 생각보다는 복잡하기 때문에, 간단하게 하나 예시를 들어서 하나하나 따라가 보겠습니다.

 

 

가장 간단한 형태였던 ResNet18를 예로 들어보도록 하겠습니다.

 

ResNet18은 Basicblock을 사용하며, 각 layer 마다 block을 2개씩 사용합니다.

 

따라서, ResNet18을 만든다고 한다면 net = ResNet(BasicBlock, [2, 2, 2, 2])로 만들 수 있습니다.

 

먼저 input은 32x32x3이 될 것이고요, 첫 번째로 self.conv1 = nn.Conv2d(3, 64, k = 3, s = 1, p = 1)를 통과해줍니다. output은 32x32x64이 됩니다. 여기에 self.bn1 = nn.BatchNorm2d(64)를 통과해줍니다. BN은 shape에 영향을 안 주니 shape는 그대로입니다.

 

다음으로는 self.layer1 = self._make_layer(BasicBlock, 64, 2, stride = 1)을 통과합니다.

 

_make_layer 함수로 들어가서 보면,

 

strides = [1, 1]이 될 것이고, for stride in strides:의 반복문을 적용하게 되면 

 

layers라는 빈 list에 BasicBlock(64, 64, 1)와 BasicBlock(64, 64, 1)을 넣어주고 self.in_planes는 그대로 64입니다. (block.expansion = 1이고, planes = 64이기 때문이죠.)

 

_make_layer는 nn.Sequential(*layers)를 return 하므로, nn.Sequential(BasicBlock(64, 64, 1), BasicBlock(64, 64, 1)) 을 return 합니다. 이를 self.layer1으로 저장하는 것이죠.

 

BasicBlock은 in_planes, planes, stride라고 하는 3개의 parameter를 입력받게 되고, 이것을 각각 (64, 64, 1), (64, 64, 1)로 받았습니다.

 

먼저 self.conv1 = nn.Conv2d(64, 64, k=3, s= 1, p= 1) => self.bn1(nn.BatchNorm2d(64)) => self.conv2 = nn.Conv2d(64, 64, k=3, s=1, p=1) => self.bn2(nn.BatchNorm2d(64)) => self.shortcut으로 연결 => F.relu => out으로 연결되는 구조입니다. 

 

stride가 모두 1이므로, 가로와 세로의 사이즈는 변동되지 않으며, channel 수는 여전히 64가 됩니다.

 

BasicBlock이 두 개였으므로, 위의 과정을 동일하게 두 번 거쳐서 나오면 ResNet18 기준에서 self.layer1을 통과한 것이라고 보시면 되겠습니다.

 

 

다음으로는 self.layer2인데요, 이는 self._make_layer(BasicBlock, 128, 2, stride = 2)를 적용합니다.

 

_make_layer 함수로 들어가게 되면, strides가 이번에는 [2, 1]가 됩니다. 아까는 [1, 1]이었죠.

 

그리고 똑같이 for stride in strides:의 반복문을 이용해서 layers.append를 해주는데, BasicBlock(64, 128, 2)와 BasicBlock(128, 128, 1)을 append 해줍니다. 그리고 이 때는 self.in_planes = planes * block.expansion인데 planes가 128이었으므로, self.in_planes가 64에서 128로 값이 변경됩니다.

 

앞과 동일하게 def _make_layer는 nn.Sequential(*layers)를 return 해주므로, ResNet 기준에서 self.layer2는 nn.Sequential(BasicBlock(64, 128, 2), BasicBlock(128, 128, 1))이 될 것입니다.

 

 

다시 BasicBlock 쪽으로 가봐야겠죠?

 

이번에는 in_planes가 64, planes가 128, stride = 2입니다.

 

이번에는 BasicBlock의 input이 아까 ResNet 기준 self.layer1에서의 output인 32x32x64가 됩니다.

 

self.conv1 = nn.Conv2d(64, 128, k=3, s = 2, p= 1) 이므로, 이를 통과해주면 16x16x128이 되고, self.bn1 = nn.BatchNorm2d(128)을 통과해줍니다.

 

다음으로는 self.conv2 = nn.Conv2d(128, 128, k=3, s= 1, p= 1)를 통과해주고, 이 때는 shape의 변동이 없습니다. self.bn2 = nn.BatchNorm2d(128)도 마찬가지죠.

 

그리고 아까와 달라지는 점은, stride가 1이 아니므로, BasicBlock의 if문에 들어갑니다. 이 때는 self.shortcut이 달라지게 되는데요.

 

self.shortcut = nn.Sequential( nn.Conv2d(64, 128, k=1, s = 21), nn.BatchNorm2d(128))가 됩니다.

 

따라서 input이었던 32x32x64를 16x16x128로 바꾸어서 shortcut이 이루어질 수 있도록 바꿔주게 됩니다.

 

따라서 ResNet 기준 self.layer2의 output은 16x16x128가 될 것입니다.

 

ResNet 기준 self.layer3, self.layer4는 동일한 방법으로 리딩 할 수 있으니 생략하겠습니다.

 

ResNet으로 다시 돌아와서, def forward 부분을 보게 되면 self.layer4가 끝난 후, F.avg_pool2d를 적용하는 것을 확인할 수 있습니다.

 

self.layer2의 output은 16x16x128이고, self.layer3의 output은 8x8x256, self.layer4의 output은 4x4x512입니다.

 

따라서, 4x4x512에 average pooling을 적용해서 이를 1x1x512로 만들어줍니다.

 

그리고. view를 이용해서 이를 Batch sizex512로 바꿔주게 되고, 이를 self.linear와 연결합니다. self.linear는 nn.Linear(512, 10)이므로, 이를 통과하면 Batch size x 10가 되겠죠? 이를 최종 예측값으로 사용해줍니다.

 

여기까지가 ResNet class에 대한 설명이 되겠습니다.

 

 

net = ResNet(BasicBlock, [2, 2, 2, 2]) # ResNet18

net = net.to(device)
cudnn.benchmark = True

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.1,
                      momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

best_acc = 0

net이라는 변수로 ResNet을 생성해주고,. to를 이용해서 GPU로 모델을 이동시켜줍니다.

 

Loss는 Cross Entropy loss를 사용해주고, optimizer는 SGD를 사용합니다. 

 

scheduler는 CosineAnnealingLR이라고 하는 것을 사용했는데, 제가 참고한 자료에 있어서 한번 써봤습니다.

 

해당 scheduler를 사용하면, learning rate가 다음과 같은 형태로 변화합니다.

 

 

 

# Training
def train(epoch):
    print('\nEpoch: %d' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    batch_count = 0
    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        batch_count += 1

    print("Train Loss : {:.3f} | Train Acc: {:.3f}".format(train_loss / batch_count, 100.*correct/total))
    final_loss = train_loss / batch_count
    final_acc = 100.*correct / total
    return final_loss, final_acc

 

다음은 Training 코드입니다.

 

먼저 신경망을 train 모드로 만들어주고, 필요한 변수들을 지정해줍니다.

 

batch size 단위로 받은 이미지를 net에 투입해서 예측 결과인 outputs을 만들어냅니다.

 

그리고 정답 값인 targets와 loss를 계산해주고, 이를 backpropagation 해줍니다. 

 

계산된 loss는 train_loss라는 변수에 계속해서 누적해서 더해줍니다.

 

outputs.max(1)을 하면 두 개의 값이 나오게 되는데요, 이는 outputs에서 max값을 나타내는 값과 해당 값의 index가 나오게 됩니다. 

 

즉, batch size가 128이라면, outputs은 (128, 10)이며, outputs.max(1)을 통해 나오는 값은 차원이 (128), (128)이 되는 것입니다.

 

우리는 예측된 index만 필요하므로, 값 부분은 _로 처리해주고 predicted라는 변수로 index를 저장해줍니다.

 

그리고 이 predicted와 targets 값이 같은지를 predicted.eq(targets)를 이용해서 계산해주고, 이는 같은지 아닌지를 1과 0으로 저장하게 되므로 sum()을 해서 배치 사이즈인 128개 중에서 몇 개가 정답을 맞혔는지 계산합니다.

 

. item()은 torch.tensor에서 값만 빼주는 코드입니다. 따라서, 이를 통해 128개 중에서 몇 개가 정답을 맞혔는지를 숫자 값으로 저장하게 되죠.

 

 

def test(epoch):
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    batch_count = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
            batch_count += 1


    print("Test Loss : {:.3f} | Test Acc: {:.3f}".format(test_loss / batch_count, 100.*correct/total))

    # Save checkpoint.
    acc = 100.*correct/total
    if acc > best_acc:
        print('Saving..')
        state = {
            'net': net.state_dict(),
            'acc': acc,
            'epoch': epoch,
        }
        if not os.path.isdir(os.path.join(saved_loc, 'checkpoint')):
            os.mkdir(os.path.join(saved_loc, 'checkpoint'))
        torch.save(state, os.path.join(saved_loc, 'checkpoint/ckpt.pth'))
        best_acc = acc
    
    final_loss = test_loss / batch_count
    return final_loss, acc

 

다음은 test 코드입니다.

 

train 하고 대부분은 큰 차이가 없으므로, 겹치는 부분은 설명을 생략하겠습니다.

 

test에는 checkpoint를 저장하는 부분이 있는데, 먼저 accuracy를 계산해줍니다.

 

그리고 best_acc보다 acc가 크면, best_acc를 acc로 바꿔줍니다. 

 

이때, 신경망의 가중치와, accuracy, epoch 값을 state로 저장하여 이를 torch.save로 저장해줍니다.

 

이를 통해, test 기준에서 가장 좋은 성능을 내는 모델을 저장하도록 합니다.

 

 

for epoch in tqdm(range(200)):
    train_loss, train_acc = train(epoch)
    test_loss, test_acc = test(epoch)

    writer.add_scalars('Loss', {"Train Loss" : train_loss, "Test Loss" : test_loss}, epoch)
    writer.add_scalars('Accuracy', {"Train acc" : train_acc, "test acc" : test_acc}, epoch)
    
    scheduler.step()

writer.close()

 

200 에폭 동안 train과 test를 진행합니다.

 

scheduler.step()은 각 epoch마다 적용되도록 해줍니다.

 

 

 

 

해당 코드를 돌려서 얻을 수 있는 결과는 다음과 같습니다.

 

 

 

위는 accuracy, 아래는 loss graph입니다.

 

epoch이 진행됨에 따라서, training accuracy와 test accuracy 모두 상승하는 것을 확인할 수 있습니다.

 

loss 또한 두 loss가 모두 잘 감소하고 있는 것을 확인할 수 있습니다.

 

 

 

이번 글에서는 ResNet을 코드로 구현한 것에 대해서 살펴보고, 결과까지 살펴보았습니다.

 

오늘 글에서 사용된 코드는 제 github에서 전체 내용을 확인하실 수 있습니다.

 

github.com/PeterKim1/paper_code_review

 

PeterKim1/paper_code_review

paper review with codes. Contribute to PeterKim1/paper_code_review development by creating an account on GitHub.

github.com

 

다음 글에서 뵙겠습니다.

 

감사합니다.

 

 

 

 

 

 

 

+ Recent posts