안녕하세요.
오늘은 저번 글에서 paper review를 진행한 PaDiM의 code review를 진행해보려고 합니다.
Pretrained CNN model을 사용하기 때문에 별도로 학습을 진행할 필요가 없으므로, 일반적으로 학습을 요구하는 model에 비해서는 코드가 간결한 편이라고 생각합니다.
본 글에서 사용되는 모든 코드는 제 Github에서 확인하실 수 있습니다.
https://github.com/PeterKim1/paper_code_review/tree/master/11.%20PaDiM
그럼 시작해보겠습니다.
Training dataset modeling
먼저, Training dataset을 가지고 Multivariate Gaussian distribution의 parameter를 추정하는 작업을 진행합니다.
import random
from random import sample
import argparse
import numpy as np
import os
import pickle
from tqdm import tqdm
from collections import OrderedDict
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import precision_recall_curve
from scipy.spatial.distance import mahalanobis
from scipy.ndimage import gaussian_filter
from skimage import morphology
from skimage.segmentation import mark_boundaries
import matplotlib.pyplot as plt
import matplotlib
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision.models import wide_resnet50_2, resnet18
import datasets.mvtec as mvtec
코드를 실행시키는 데 있어서 필요한 library들을 모두 불러옵니다.
# device setup
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
def parse_args():
parser = argparse.ArgumentParser('PaDiM')
parser.add_argument('--data_path', type=str, default='/content/drive/MyDrive/MVTec')
parser.add_argument('--save_path', type=str, default='./mvtec_result')
parser.add_argument('--arch', type=str, choices=['resnet18', 'wide_resnet50_2'], default='wide_resnet50_2')
return parser.parse_args()
GPU 연산이 가능한지를 use_cuda로 저장하고, 사용 가능한 장치를 device에 저장합니다.
그리고 해당 코드는 .py 파일로 구성되어 있어, argparse를 사용하고 있습니다.
--data_path는 MVTec AD의 경로를 지정해주고, save_path는 결과를 저장할 경로를 지정해줍니다.
--arch는 어떤 model을 사용할 것인지를 결정합니다.
def main():
args = parse_args()
# load model
if args.arch == 'resnet18':
model = resnet18(pretrained=True, progress=True)
t_d = 448
d = 100
elif args.arch == 'wide_resnet50_2':
model = wide_resnet50_2(pretrained=True, progress=True)
t_d = 1792
d = 550
model.to(device)
model.eval()
random.seed(1024)
torch.manual_seed(1024)
if use_cuda:
torch.cuda.manual_seed_all(1024)
idx = torch.tensor(sample(range(0, t_d), d))
이제 main 함수의 시작 부분입니다.
먼저 아까 정의했던 argparse를 args로 저장합니다.
--arch로 선택한 모델의 구조에 따라서, model을 불러옵니다.
t_d는 해당 모델의 Layer 1, Layer 2, Layer 3의 embedding vector를 concatenate 했을 때의 차원의 수를 나타내고, d는 그중에서 random dimensionality reduction을 진행할 random dimension의 수를 나타냅니다.
즉, ResNet18을 사용하면, embedding vector를 448차원으로 얻게 되고, 그중에서 100차원을 랜덤으로 뽑게 된다고 이해해주시면 될 것 같습니다.
idx는 t_d 차원에서 임의로 d차원을 뽑아서 저장해 torch tensor로 저장합니다.
# set model's intermediate outputs
outputs = []
def hook(module, input, output):
outputs.append(output)
model.layer1[-1].register_forward_hook(hook)
model.layer2[-1].register_forward_hook(hook)
model.layer3[-1].register_forward_hook(hook)
다음은 forward hook을 이용해서 모델의 Layer 1, Layer 2, Layer 3의 output을 outputs라는 변수에 저장할 수 있도록 해줍니다.
for class_name in mvtec.CLASS_NAMES:
train_dataset = mvtec.MVTecDataset(args.data_path, class_name=class_name, is_train=True)
train_dataloader = DataLoader(train_dataset, batch_size=32, pin_memory=True)
test_dataset = mvtec.MVTecDataset(args.data_path, class_name=class_name, is_train=False)
test_dataloader = DataLoader(test_dataset, batch_size=32, pin_memory=True)
train_outputs = OrderedDict([('layer1', []), ('layer2', []), ('layer3', [])])
test_outputs = OrderedDict([('layer1', []), ('layer2', []), ('layer3', [])])
mvtec ad의 class 각각에 대해서 for문을 작동시키고, 각 class의 train dataset과 test dataset을 만들어줍니다.
그리고 train dataset의 embedding vector와 test dataset의 embedding vector를 저장하기 위해서 OrderedDict을 이용해 Layer 1, Layer 2, Layer 3에 해당하는 embedding vector를 저장할 장소를 train_outputs과 test_outputs로 지정해둡니다.
# extract train set features
train_feature_filepath = os.path.join(args.save_path, 'temp_%s' % args.arch, 'train_%s.pkl' % class_name)
if not os.path.exists(train_feature_filepath):
for (x, _, _) in tqdm(train_dataloader, '| feature extraction | train | %s |' % class_name):
# model prediction
with torch.no_grad():
_ = model(x.to(device))
# get intermediate layer outputs
for k, v in zip(train_outputs.keys(), outputs):
train_outputs[k].append(v.cpu().detach())
# initialize hook outputs
outputs = []
for k, v in train_outputs.items():
train_outputs[k] = torch.cat(v, 0)
train_feature_filepath는 각 class에 대해서 저장한 Gaussian distribution의 parameter를 pickle 파일로 저장할 저장 경로를 저장하는 변수입니다.
다음으로는 for문을 돌게 되는데, 여기서 x는 train dataset의 이미지를 나타냅니다.
shape를 찍어보면 (32, 3, 224, 224) 임을 확인할 수 있습니다. (위에서 train_dataloader와 test_dataloader의 batch size가 32이기 때문에 32로 시작됩니다.)
그리고 _ = model(x.to(device))는 원래 model의 결과가 별도로 저장되어야 하는데, 본 모델에서는 모델의 최종 output을 필요로 하는 것이 아니라 중간 output을 필요로 하기 때문에 _를 사용하여 별도로 변수에 저장되지 않도록 처리한 것입니다.
이 코드 라인이 실행되면, 앞에서 지정한 forward hook에 의해서 Layer 1, 2, 3에서 얻어지는 중간 output 결과들을 outputs라는 변수에 저장이 됩니다.
for k, v in zip(train_outputs.keys(), outputs): 코드라인은 outputs에 저장된 각각의 요소들을 OrderedDict인 train_outputs의 각 Key에 append 하는 코드입니다.
forward hook이 Layer 1, 2, 3에 각각 작동하기 때문에 outputs에는 총 3개의 데이터가 append 되어 있는 상태입니다.
따라서 for문을 통해 각각을 train_outputs의 Key에 저장할 수 있게 됩니다.
그리고 outputs = []를 통해 중간 output을 저장하는 리스트를 다시 비워서 다음 class에 대해서 동일한 코드가 작동할 때 새롭게 저장될 수 있도록 만들어줍니다.
for k, v in train_outputs.items(): 코드라인은 앞에서 batch 단위로 저장되어 있는 데이터들을 합쳐서 전체로 만들어주는 코드입니다.
예를 들어서, bottle class는 209장의 train 이미지가 있는데, batch는 32이므로 32짜리로 여러 개의 데이터가 저장되어 있을 것입니다.
이를 concat 시켜서 209장 짜리의 embedding vector로 만들어주는 작업이라고 보시면 되겠습니다.
# Embedding concat
embedding_vectors = train_outputs['layer1'] # torch.Tensor, (209, 64, 56, 56)
for layer_name in ['layer2', 'layer3']:
embedding_vectors = embedding_concat(embedding_vectors, train_outputs[layer_name])
다음으로는 Layer 1과 Layer 2, Layer 3에 저장된 embedding vector를 concat 하는 코드입니다.
사실 코드상으로는 이렇게 짧게 되어있지만, embeddding_concat이라는 함수는 생각보다 복잡한 형태로 구성이 되어 있습니다.
def embedding_concat(x, y):
B, C1, H1, W1 = x.size()
_, C2, H2, W2 = y.size()
s = int(H1 / H2)
x = F.unfold(x, kernel_size=s, dilation=1, stride=s)
x = x.view(B, C1, -1, H2, W2)
z = torch.zeros(B, C1 + C2, x.size(2), H2, W2)
for i in range(x.size(2)):
z[:, :, i, :, :] = torch.cat((x[:, :, i, :, :], y), 1)
z = z.view(B, -1, H2 * W2)
z = F.fold(z, kernel_size=s, output_size=(H1, W1), stride=s)
return z
사실 이 함수가 이렇게 복잡하게 된 이유는, 각 Layer 마다의 embedding vector의 사이즈가 전부 다르기 때문입니다.
model을 ResNet18을 사용한다고 가정하였을 때,
Layer 1의 embedding vector는 (Training dataset size, 64, 56, 56)이며
Layer 2의 embedding vector는 (Training dataset size, 128, 28, 28)이고
Layer 3의 embedding vector는 (Training dataset size, 256, 14, 14)입니다.
즉 가로와 세로가 절반씩 줄어드는 구조이기 때문에 단순히 Channel 방향으로 concat 시키는 것이 불가능하기 때문입니다.
일일이 shape를 다 확인하는 것도 방법이긴 하지만 대략 어떤식으로 embedding vector concat이 이루어지는지만 알고 있으면 충분할 것이라고 생각이 됩니다. (물론 코드를 하나하나 자세히 shape 찍어보시는 것도 좋지만 이 부분이 워낙 복잡하여 전체적인 컨셉만 짚고 넘어가려고 합니다.)
(N, 1, 4, 4) 크기의 embedding vector(A)와 (N, 2, 2, 2) 크기의 embedding vector(B)가 있다고 가정해보겠습니다.
N은 training dataset size이나, 4차원은 그림으로 표현할 수 없으므로 생략하고 3차원에 대해서만 그림으로 설명해보겠습니다.
embedding vector A와 B는 다음과 같이 그림으로 표현할 수 있습니다.
이를 위 함수를 이용해서 concat 하면 다음과 같은 방식으로 concat이 진행됩니다.
먼저 높이와 폭 기준으로 더 넓은 쪽인 (N, 1, 4, 4)가 앞쪽에 원래대로 위치하고, 높이와 폭이 1/2인 (N, 2, 2, 2)짜리는 각 위치에서 2x2 사이즈만큼 복사되어서 높이, 폭의 2배 사이즈 차이를 메우게 됩니다.
이게 말로 하려니 설명이 살짝 어려운데, 첫 번째 사진과 두 번째 사진을 보시면서 값들이 어떻게 배치되었는지 확인해주시면 이해가 크게 어렵지는 않을 것이라고 생각합니다.
이러한 방식을 이용하게 되면, 높이와 폭 사이즈는 그대로 유지되면서, 합쳐지는 두 embedding vector의 차원 수만 합쳐지는 결과를 가지고 오게 됩니다.
이런 방식으로 Layer 1, Layer 2, Layer 3에서 나오는 embedding vector를 합칠 수 있게 되고, ResNet18 기준으로 얻어지는 (Training dataset size, 64, 56, 56), (Training dataset size, 128, 28, 28), (Training dataset size, 256, 14, 14)을 concat 하게 되면 (Training dataset size, 64+128+256, 56, 56) 사이즈의 embedding vector를 얻을 수 있게 됩니다.
# randomly select d dimension
embedding_vectors = torch.index_select(embedding_vectors, 1, idx)
위 과정을 통해서 얻어진 embedding vector에서 d 차원만큼을 랜덤으로 선택합니다.
PaDiM 논문에서는 embedding vector 전체를 사용하는 것에 비해서 random 하게 차원을 줄였을 때 시간 복잡도와 공간 복잡도가 줄어들지만 anoamly localization 성능에서는 큰 폭의 차이가 없음을 보였습니다.
# calculate multivariate Gaussian distribution
B, C, H, W = embedding_vectors.size()
embedding_vectors = embedding_vectors.view(B, C, H * W) # (209, 100, 56*56)
mean = torch.mean(embedding_vectors, dim=0).numpy() # (100, 56*56), 샘플 간 평균
cov = torch.zeros(C, C, H * W).numpy() # (100, 100, 56*56) 100차원 간 cov 계산
I = np.identity(C)
for i in range(H * W):
cov[:, :, i] = np.cov(embedding_vectors[:, :, i].numpy(), rowvar=False) + 0.01 * I
# save learned distribution
train_outputs = [mean, cov]
with open(train_feature_filepath, 'wb') as f:
pickle.dump(train_outputs, f)
다음은 얻어진 embedding vector를 가지고 multivariate Gaussian distribution의 parameter를 얻는 코드입니다.
먼저 embedding_vector를 (B, C, H*W)의 차원으로 바꿔줍니다.
ResNet18 기준이라면, (N, 100, 56*56)이 됩니다. (최종적으로 얻어진 embedding vector는 (N, 448, 56, 56)이지만 448차원 중 100차원을 랜덤으로 선택하였으므로 (N, 100, 56, 56)을 얻게 되고, 이를 (N, 100, 56*56)로 reshape 한 것입니다.)
그리고 0차원(데이터 셋 크기) 기준으로 평균을 구합니다. 이는 (100, 56*56)의 size를 가집니다.
의미적으로 본다면, 이미지에서 각 패치의 embedding 평균을 구한 것이 되겠죠?
그다음으로는 공분산을 계산해야 하는데, 먼저 torch.zeros로 0으로 채워진 (100, 100, 56*56) 짜리 tensor를 만들어줍니다.
그리고 for문을 돌면서, cov[:, :, i] = np.cov(embedding_vectors[:, :, i].numpy(), rowvar=False) + 0.01 * I를 실행합니다.
embedding vectors[:, :, i]라면 patch의 각 위치에서의 데이터가 되고, 크기는 (N, 100)이 될 것입니다.
np.cov에 rowvar라는 인자가 있는데, 이는 row를 기준으로 분산을 구할 것인지를 물어보는 것입니다.
rowvar = False이므로, 우리는 row를 기준으로 분산을 구하지 않고 column을 기준으로 분산을 구하게 됩니다.
이렇게 하면 column인 100(embedding vector의 차원 수)을 기준으로 분산을 구할 수 있게 됩니다.
이를 cov[:, :, i]에 저장하게 되죠. cov가 (100, 100, 56*56) 였음을 감안한다면, cov[:, :, i]는 각 patch 위치에서의 (100, 100) 크기의 matrix가 되는 것을 알 수 있습니다.
0.01 * I는 논문에서도 언급하였듯이 regularization term이고, 공분산 matrix의 invertible이 보장되도록 하는 역할을 수행합니다.
마지막으로 우리가 구한 mean과 cov를 pickle.dump을 통해서 pickle 파일로 저장해줍니다.
else:
print('load train set feature from: %s' % train_feature_filepath)
with open(train_feature_filepath, 'rb') as f:
train_outputs = pickle.load(f)
만약 별도로 이미 저장된 pickle 파일이 있다면, 이를 읽어오기만 하면 됩니다.
아마 이미 계산된 mean과 cov가 있는 경우는 별도로 연산하지 않고 바로 불러올 수 있게끔 코드를 짜 놓은 것 같습니다.
Inference
gt_list = [] # label을 담는 list
gt_mask_list = [] # mask를 담는 list
test_imgs = [] # 이미지를 담는 list
다음으로는 test image에 대한 Inference를 진행하겠습니다.
먼저 label(0인지 1인지, 0이면 정상, 1이면 이상입니다.), segmentation mask(정답 mask), 정답 이미지를 담는 list를 먼저 지정합니다.
for (x, y, mask) in tqdm(test_dataloader, '| feature extraction | test | %s |' % class_name):
test_imgs.extend(x.cpu().detach().numpy())
gt_list.extend(y.cpu().detach().numpy())
gt_mask_list.extend(mask.cpu().detach().numpy())
# model prediction
with torch.no_grad():
_ = model(x.to(device))
# get intermediate layer outputs
for k, v in zip(test_outputs.keys(), outputs):
test_outputs[k].append(v.cpu().detach())
# initialize hook outputs
outputs = []
for k, v in test_outputs.items():
test_outputs[k] = torch.cat(v, 0)
# Embedding concat
embedding_vectors = test_outputs['layer1']
for layer_name in ['layer2', 'layer3']:
embedding_vectors = embedding_concat(embedding_vectors, test_outputs[layer_name])
# randomly select d dimension
embedding_vectors = torch.index_select(embedding_vectors, 1, idx)
test_dataloader에서 나오는 이미지 데이터(x)와 라벨(y), segmentation mask(mask)를 각각 앞에서 미리 지정된 list에 extend로 추가합니다.
그다음 코드들은 제가 앞에서 설명한 것과 동일하죠??
이미지를 주고 예측을 하게 만든 다음, 최종 output은 저장하지 않고 forward hook을 이용해서 Layer 1, 2, 3에서 나온 결과들이 저장되도록 만들고 test_outputs에 저장해두는 과정을 거칩니다.
그리고 얻어진 embedding vector를 concat 하고, 이 중에서 랜덤으로 d 차원만 골라줍니다.
# calculate distance matrix
B, C, H, W = embedding_vectors.size() # (N, 100, 56, 56)
embedding_vectors = embedding_vectors.view(B, C, H * W).numpy() # (N, 100, 56*56)
dist_list = []
for i in range(H * W):
mean = train_outputs[0][:, i]
conv_inv = np.linalg.inv(train_outputs[1][:, :, i])
dist = [mahalanobis(sample[:, i], mean, conv_inv) for sample in embedding_vectors]
dist_list.append(dist)
다음으로는 Test image의 embedding_vector에 대해서 Mahalanobis distance를 계산하는 코드입니다.
patch의 각 위치에 대해서 평균과 공분산을 가져오고, distance 계산을 위해 공분산을 역행렬로 바꿔줍니다.
그리고 평균과 공분산을 가지고서 각 test image의 embedding vector에 대해서 distance 계산을 수행합니다.
dist_list = np.array(dist_list).transpose(1, 0).reshape(B, H, W) # (N, 56, 56)
# upsample
dist_list = torch.tensor(dist_list)
score_map = F.interpolate(dist_list.unsqueeze(1), size=x.size(2), mode='bilinear',
align_corners=False).squeeze().numpy() # (N, 224, 224)
계산해둔 distance를 저장한 dist_list는 patch의 각 위치에 대해서 테스트 이미지 개수만큼의 거리 계산을 수행하므로, shape가 (56*56, N)이 됩니다.
이를 transpose 해서 (N, 56*56)로 만들어주고, reshape 해서 (N, 56, 56) 형태로 만들어줍니다.
이렇게 하면 Test image 한 장당 56 x 56 짜리의 anomaly map을 얻을 수 있는 것이죠.
이를 bilinear interpolation을 이용하여 원래 이미지 사이즈인 224x224 사이즈로 늘려줍니다.
# apply gaussian smoothing on the score map
for i in range(score_map.shape[0]):
score_map[i] = gaussian_filter(score_map[i], sigma=4)
# Normalization
max_score = score_map.max()
min_score = score_map.min()
scores = (score_map - min_score) / (max_score - min_score)
#print("score shape; ", scores.shape) # (N, 224, 224)
img_scores = scores.reshape(scores.shape[0], -1).max(axis=1)
구해놓은 score_map에 논문에서 언급된 대로 gaussian filter를 적용해주고, anomaly map을 normalization 해줍니다.
이 작업을 통해서 모든 anomaly map이 0 ~ 1 사이의 값을 가지도록 만들어줄 수 있습니다.
마지막으로 이미지의 anomaly score는 해당 anomaly map의 최댓값을 이용해서 계산해주게 됩니다.
나머지 코드들은 대부분 구해진 값을 이용하여 시각화를 진행하는 코드들이라 해당 코드 리뷰에서는 제외하였습니다.
실제 코드를 구동하는 것에 관심이 있으신 분들은 Github에 올려진 코드를 이용하셔서 직접 실행해보시면 좋을 것 같습니다.
구해진 Embedding vector를 합치는 코드가 조금 까다로워서 보는데 시간이 걸렸지만, 그 외에 코드들은 크게 어려운 코드가 없어 보시면서 이해하는데 어렵지 않을 것이라고 생각합니다.
이번 code review는 여기까지 진행하도록 하겠습니다.
감사합니다.