본문 바로가기

8. OpenCV | CV

8/2(수) IT K-DT(103일차) / 3.Classification실습

 

폐 사진을 보고 1. 일반인의 폐, 2. 코로나에 걸린 사람의 폐, 3. 폐렴에 걸린 사람의 폐를 구별하는 코드 작성해보기
(VGG19(classification) 모델 사용 / Dataset, DataLoader, Train/Test 클래스화 / Test셋에 대한 예측 및 결과를 시각화)

 


가상환경 생성
* python이 2개가 설치되어있기 때문에 env 가상환경을 만들어서 실행해주는것이 좋음

cmd 경로를 지정한 후, 터미널에 작성
pip install pipenv

2개 이상의 python이 설치되어있을 때, 가상환경을 사용하고자하는 python의 버전을 입력
여기서는 3.8버전의 python을 사용할 예정
pipenv --python 3.8

가상환경의 이름 생성
pipenv shell

커널의 설치
pipenv install ipykernel

jupyter notebook에 사용할 커널 지정 // display의 이름과 사용자 이름을 모두 test로 사용 // 새로고침해서 kernel→change kernel
python -m ipykernel install --user --display-name test --name test

코드 작성 중, cv2의 import에서 오류가 발생하여 가상환경은 해제하였음.

 

!pip install opencv-python
!pip install matplotlib
!pip install torch
!pip install torchvision
!pip install interact # 조절 막대기 바를 생성

 

import os
import cv2
import torch
import warnings
import copy
warnings.filterwarnings('ignore') # 오류 무시
from ipywidgets import interact
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as f

 

image_files = []

# data_dir: Covid19-dataset/train
# sub_dir : Normal

# 이미지 파일을 list로 받아오는 함수 생성

def list_image_file(data_dir, sub_dir):
    image_format = ['jpeg', 'jpg', 'png']
    image_files = []
    image_dir = os.path.join(data_dir, sub_dir) # Covid19-dataset/train/Normal
    for file_path in os.listdir(image_dir): 
    # 파일이나 폴더의 내용을 list로 가져오는 것
        if file_path.split('.')[-1] in image_format: 
        # .을 기준으로 split해서 마지막부분인 확장명이 image_format에 해당될때만 아래로 내려가도록 
        # if문을 구성
                image_files.append(os.path.join(sub_dir, file_path))
    return image_files

 

data_dir = './Covid19-dataset/train/'
normals_list = list_image_file(data_dir, 'Normal')
covids_list = list_image_file(data_dir, 'Covid')
pneumonia_list = list_image_file(data_dir, 'Viral Pneumonia')

 

print(len(normals_list))
print(len(covids_list))
print(len(pneumonia_list))

 

 

# 이미지를 RGB로 만들어주는 함수 생성

def get_RGB_image(data_dir, file_name):
    image_file = os.path.join(data_dir, file_name)
    image = cv2.imread(image_file)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

 

# len(covids_list)의 111개 중 70개만 보고 41개는 보지 않음

min_num_files = min(len(normals_list), len(covids_list), len(pneumonia_list))
min_num_files

 

 

# 조절 막대기 바 생성

@interact(index=(0, min_num_files-1)) # 범위:0~69
def show_samples(index=0): # 조절 바가 움직일때마다 이 함수를 호출
    normal_image = get_RGB_image(data_dir, normals_list[index]) 
    # index번호에 따라 RGB로 바꿔준 후 return된 사진을 출력
    covid_image = get_RGB_image(data_dir, covids_list[index])
    pnuemonia_image = get_RGB_image(data_dir, pneumonia_list[index])
    
    plt.figure(figsize=(12,8))
    plt.subplot(131)
    plt.title('normal')
    plt.imshow(normal_image)
    
    plt.subplot(132)
    plt.title('covid')
    plt.imshow(covid_image)
    
    plt.subplot(133)
    plt.title('pneumonia')
    plt.imshow(pnuemonia_image)
    plt.tight_layout # 이미지를 레이아웃에 자동으로 맞추어주는 기능

 

 

# class를 이용한 데이터셋 구축

train_data_dir = './Covid19-dataset/train/'
class_list = ['Normal', 'Covid', 'Viral Pneumonia'] # 3개 이상의 클래스이므로 multi

 

class Chest_dataset(Dataset):
    def __init__(self, data_dir, transform=None): # 경로에 따른 데이터를 가져와서 저장하는 것
        self.data_dir = data_dir
        normals = list_image_file(data_dir, 'Normal')
        covids = list_image_file(data_dir, 'Covid')
        pneumonias = list_image_file(data_dir, 'Viral Pneumonia')
        self.files_path = normals + covids + pneumonias
        self.transform = transform
        
    # len 함수
    def __len__(self):
        return len(self.files_path)
    
    def __getitem__(self, index):
        image_file = os.path.join(self.data_dir, self.files_path[index])
        image = cv2.imread(image_file)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        target = class_list.index(self.files_path[index].split(os.sep)[-2])
        if self.transform:
            image = self.transform(image)
            target = torch.Tensor([target]).long() # Tensor형태의 정수형으로 바꿈
            
        
        return {'image':image, 'target':target}

 

dset = Chest_dataset(train_data_dir, transformer)

 

index = 100
plt.title(class_list[dset[index]['target']])
plt.imshow(dset[index]['image'])

 

 

transformer = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

 

class Chest_dataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        normals = list_image_file(data_dir, 'Normal')
        covids = list_image_file(data_dir, 'Covid')
        pneumonias = list_image_file(data_dir, 'Viral Pneumonia')
        self.files_path = normals + covids + pneumonias
        self.transform = transform
        
    def __len__(self):
        return len(self.files_path)

    def __getitem__(self, index):
        image_file = os.path.join(self.data_dir, self.files_path[index])
        image = cv2.imread(image_file)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        target = class_list.index(self.files_path[index].split(os.sep)[-2])
        if self.transform:
            image = self.transform(image)
            target = torch.Tensor([target]).long()
         
        return {'image':image, 'target':target}

 

train_dset = Chest_dataset(train_data_dir, transformer)

 

index = 100
image=train_dset[index]['image']
label=train_dset[index]['target']
print(image.shape, label)

 

 

def build_dataloader(train_data_dir, val_data_dir):
    dataloaders = {}
    train_dset = Chest_dataset(train_data_dir, transformer)
    dataloaders['train'] = DataLoader(train_dset, batch_size=4, shuffle=True, drop_last=True) 
    # drop_last=True: 학습 시 batch size에 튀어나온부분은 제거하는 기능  
    
    val_dset = Chest_dataset(val_data_dir, transformer)
    dataloaders['val'] = DataLoader(val_dset, batch_size=1, shuffle=False, drop_last=False)
    return dataloaders

 

train_data_dir = './Covid19-dataset/train/'
val_data_dir = './Covid19-dataset/test/'
dataloaders = build_dataloader(train_data_dir, val_data_dir)

 

for i, d in enumerate(dataloaders['train']):
    print(i, d) # i: image와 관련된 데이터 d: key, value로 이루어진 dictionary
    if i == 0: # batch_size가 4개씩 묶여서 잘 나오고 있음을 확인
        break

 

 

# target의 shape 뽑기
d['target'].shape

 

 

VGG19(Classification) 모델 불러오기
Visual Geometry Group의 약자로, 다중 레이어가 있는 표준 심층 CNN 아키텍처.

 

 

# torchvision의 models를 사용
# pretrained=True를 사용할 시 미리 학습된 weight들을 가지고 옴(전이학습)
model = models.vgg19(pretrained=True)
model

 

 

def build_vgg19_based_model(device_name='cpu'):
    device = torch.device(device_name)
    model = models.vgg19(pretrained=True)
    model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
    model.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(512,256),
        nn.ReLU(),
        nn.Linear(256,len(class_list)), # 256을 받아서 class_list의 수만큼 줄임
        nn.Softmax(dim=1)
    )
    return model.to(device)
    
model = build_vgg19_based_model()
    
# 손실함수
loss_func = nn.CrossEntropyLoss(reduction='mean')

optimizer = torch.optim.SGD(model.parameters(), lr=1E-3, momentum=0.9)

# accuracy 함수
@torch.no_grad()
def get_accuracy(image, target, model):
    batch_size = image.shape[0]
    prediction = model(image)
    _, pred_label = torch.max(prediction, dim=1)
    is_correct = (pred_label == target)
    return is_correct.cpu().numpy().sum() / batch_size

 

def train_one_epoch(dataloaders, model, optimizer, loss_func, device):
    losses = {}
    accuracies = {}
    
    for tv in ['train', 'val']:
        running_loss = 0.0
        running_correct = 0
        if tv == 'train':
            model.train()
        else:
            model.eval()
        for index, batch in enumerate(dataloaders[tv]):
            image = batch['image'].to(device)
            target = batch['target'].squeeze(dim=1).to(device)
            
            with torch.set_grad_enabled(tv=='train'): # 그레디언트를 동작시켜줌
                prediction = model(image)
                loss = loss_func(prediction, target)
                if tv == 'train':
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    
                running_loss += loss.item()
                running_correct += get_accuracy(image, target, model)
                
                if tv == 'train':
                    if index % 10 == 0:
                        print(f"{index}/{len(dataloaders['train'])} - Running loss: {loss.item()}")
        losses[tv] = running_loss / len(dataloaders[tv])
        accuracies[tv] = running_correct / len(dataloaders[tv])
    return losses, accuracies

 

def save_best_model(model_state, model_name, save_dir='./trained_model'):
    os.makedirs(save_dir, exist_ok=True)
    torch.save(model_state, os.path.join(save_dir, model_name))

 

device = torch.device('cpu')

train_data_dir = './Covid19-dataset/train/'
val_data_dir = './Covid19-dataset/test/'
dataloaders = build_dataloader(train_data_dir, val_data_dir)
model = models.vgg19(pretrained=True)
loss_func = nn.CrossEntropyLoss(reduction='mean')
optimizer = torch.optim.SGD(model.parameters(), lr=1E-3, momentum=0.9)

 

num_epochs = 10
best_acc = 0.0
train_loss, train_accuracy = [], []
val_loss, val_accuracy = [], []
for epoch in range(num_epochs):
    losses, accuracies = train_one_epoch(dataloaders, model, optimizer, loss_func, device)
    train_loss.append(losses['train'])
    val_loss.append(losses['val'])
    train_accuracy.append(accuracies['train'])
    val_accuracy.append(accuracies['val'])
    print(f"{epoch+1}/{num_epochs} - Train Loss:{losses['train']}, Val Loss:{losses['val']}")
    print(f"{epoch+1}/{num_epochs} - Train Accuracy:{accuracies['train']}, Val Accuracy:{accuracies['val']}")
    
    if(epoch > 3) and (accuracies['val']>best_acc):
        best_acc = accuracies['val']
        best_model = copy.deepcopy(model.state_dict())
        save_best_model(best_model, f'model_{epoch+1:02d}.pth')
        
print(f'Best Accuracy: {best_acc}')

 

 

plt.figure(figsize=(6,5))

plt.subplot(211)
plt.plot(train_loss, label='train')
plt.plot(val_loss, label='val')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.grid('on')
plt.legend()

plt.subplot(212)
plt.plot(train_accuracy, label='train')
plt.plot(val_accuracy, label='val')
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.grid('on')
plt.legend()

plt.tight_layout()

 

 

def preprocess_image(image):
    transformer = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224,224)),
        transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5])
    ])
    
    tensor_image = transformer(image) # C, H, W의 형태
    tensor_image = tensor_image.unsqueeze(0) # B, C, H, W의 형태 // 0번 index의 자리에 차원을 늘린 형태
    return tensor_image

 

def model_predict(image, model):
    tensor_image = preprocess_image(image)
    prediction = model(tensor_image)
    
    _, pred_label= torch.max(prediction.detach(), dim=1)
    print('predict detach(): ', prediction.detach())
    print('pred_label: ', pred_label)
    pred_label = pred_label.squeeze(0)
    return pred_label.item()

 

ckpt = torch.load('./trained_model/model_08.pth')
model = build_vgg19_based_model()
model.load_state_dict(ckpt)
model.eval()

 

 

data_dir = './Covid19-dataset/train'
test_normals_list = list_image_file(data_dir, 'Normal')
test_covids_list = list_image_file(data_dir, 'Covid')
test_pneumonia_list = list_image_file(data_dir, 'Viral Pneumonia')

class_list = ['Normal', 'Covid', 'Viral Pneumonia']

 

# 조절 막대기 바 생성

min_num_files = min(len(test_normals_list), len(test_covids_list), len(test_pneumonia_list))

@interact(index=(0, min_num_files-1)) # 범위:0~69
def show_samples(index=0): # 조절 바가 움직일때마다 이 함수를 호출
    normal_image = get_RGB_image(data_dir, test_normals_list[index]) 
    # index번호에 따라 RGB로 바꿔준 후 return된 사진을 출력
    covid_image = get_RGB_image(data_dir, test_covids_list[index])
    pneumonia_image = get_RGB_image(data_dir, test_pneumonia_list[index])
    
    prediction_1 = model_predict(normal_image, model)
    prediction_2 = model_predict(covid_image, model)
    prediction_3 = model_predict(pneumonia_image, model)
    
    plt.figure(figsize=(12,8))
    
    plt.subplot(131)
    plt.title(f'Normal, Pred:{class_list[prediction_1]}')
    plt.imshow(normal_image)
    
    plt.subplot(132)
    plt.title(f'Covid, Pred:{class_list[prediction_2]}')
    plt.imshow(covid_image)
    
    plt.subplot(133)
    plt.title(f'Pneumonia_image, Pred:{class_list[prediction_3]}')
    plt.imshow(pneumonia_image)
    plt.tight_layout # 이미지를 레이아웃에 자동으로 맞추어주는 기능