/3D-MRI-Cheatsheet

3D MRI 영상을 다루면서 자주 쓰는 코드 모음

Primary LanguagePython

3D-MRI-Cheatsheet

3D MRI 영상을 다루면서 자주 쓰는 코드 모음

목차

  1. 자주 쓰는 라이브러리
  2. 이미지 전처리
  3. 레이블 전처리
  4. 학습 관련
  5. Pretrained Model
  6. Figure and Table
  7. 기타 잡다한 코드

자주 쓰는 라이브러리

pydicom

DICOM 이미지를 다룰 수 있다.

pip install pydicom

이미지 전처리

3D 이미지를 numpy 배열로 저장

# source reference: https://pydicom.github.io/pydicom/dev/auto_examples/image_processing/reslice.html
df = pd.read_csv('./data.csv', index_col=0)  # index column contains folder names in my data
images = []
for fname in df.index:
    img_dir = os.listdir(fname)
    files = []
    for img in img_dir:
        files.append(dcmread(fname + '/' + img))
    
    slices = []
    skipcount = 0
    for f in files:
        if hasattr(f, 'SliceLocation'):
            slices.append(f)
        else:
            skipcount += 1
    slices = sorted(slices, key=lambda s: s.SliceLocation)
    
    img_shape = list(slices[0].pixel_array.shape)
    img_shape.append(len(slices))
    image = np.zeros(img_shape)
    
    for i, ds in enumerate(slices):
        data = ds.pixel_array
        image[:,:,i] = data
    images.append(image)
arr = np.array(images)  # to numpy array

grayscale을 rgb로 변환

arr = np.stack((arr,)*3, axis=-1)  # to 3 channel

이미지 자르기(가운데 기준)

def crop(arr, size):
    h, w = arr.shape
    if size%2==0:
        s = size//2
        arr = arr[(h//2-s):(h//2+s), (w//2-s):(w//2+s)]
    else:
        s = (size+1)//2
        arr = arr[(h//2-s):(h//2+s-1), (w//2-s):(w//2+s-1)]
    return arr

이미지 thresholding

img[img < _] = _
img[img > _] = _

2D 이미지 resizing

from skimage.transform import resize
#...
img = resize(img, (NEW_WIDTH, NEW_HEIGHT), anti_aliasing=True)

3D 이미지 resizing

def resize_data(data, new_size_x, new_size_y, new_size_z):
    initial_size_x, initial_size_y, initial_size_z = data.shape
    
    delta_x = initial_size_x / new_size_x
    delta_y = initial_size_y / new_size_y
    delta_z = initial_size_z / new_size_z
    
    new_data = np.zeros((new_size_x, new_size_y, new_size_z))
    
    for x, y, z in itertools.product(range(new_size_x), range(new_size_y), range(new_size_z)):
        new_data[x, y, z] = data[int(x*delta_x), int(y*delta_y), int(z*delta_z)]
    
    return new_data

resized 이미지 새 파일로 저장

ds.PixelData = resized_img.tobytes()
ds.Rows, ds.Columns = resized_img.shape ### !!!!!!!!!!
ds.save_as(FILENAME)

레이블 전처리

standardize(표준화)

def standardize(df):  # standardization(표준화)
    new_df = df.copy()
    def _stand(x):
        mean, std = x.mean(axis=0), x.std(axis=0)
        z = (x - mean) / std
        return z
    
    for col in new_df.columns:
        new_df[col] = _stand(new_df[col])
    return new_df

normalize(정규화)

def normalize(df):  # 정규화
    new_df = df.copy()
    
    def _norm(x):
        z = (x - min(x)) / (max(x) - min(x))
        return z
    
    for col in df.columns:
        new_df[col] = _norm(new_df[col])
    
    return new_df

denormalize

def denormalize(z, x):
    x = z*(max(x) - min(x)) + min(x)
    return np.round(x, 4)

destandardize + denormalize

def denormalize(z, x):
    mean = x.mean(axis=0)  # x: original data
    x = z*(max(x - mean) - min(x - mean)) + min(x - mean) + mean
    return np.round(x, 4)

학습 관련

데이터셋 준비

single column for label

class MyDataset(Dataset):
    
    def __init__(self, df=None, col=0):
        if df is None:
            df = load_data()
        self.X = df['image'].values
        self.y = df.iloc[:, col]
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        image = self.X[idx]
        label = np.array([self.y.iloc[idx]]).astype('float')
        return [image, label]

multiple columns for label

class MyDataset(Dataset):
    
    def __init__(self, df=None):
        if df is None:
            df = load_data()
        self.X = df['image'].values
        self.y = df.iloc[:, :5]
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        image = self.X[idx]
        label = np.array(self.y.iloc[idx]).astype('float')
        return [image, label]

training & validation 함수

training

def train_epoch(model, criterion, optimizer, train_loader, scheduler=None):
    # train the model
    train_loss = 0.0
    model.train()
    for batch_idx, (inputs, y) in enumerate(train_loader):
        # import data and move to gpu
        inputs, y = inputs.to(device, dtype=torch.float), y.to(device, dtype=torch.float)
        
        # clear the gradients
        optimizer.zero_grad()
        
        # compute the model output
        y_hat = model(inputs)
        loss = criterion(y_hat, y)
        
        # credit assignment (back propagation)
        loss.backward()
        
        # update model weights
        optimizer.step()
        if scheduler is not None:
            scheduler.step()
            
#             train_loss = train_loss + ((1/(batch_idx+1)) * (loss.data - train_loss))
        train_loss += loss.item() * inputs.size(0)
        print("=", end='')
    return train_loss

validation

def valid_epoch(model, criterion, test_loader):
    # validate the model
    valid_loss = 0.0
    model.eval()
    for batch_idx, (inputs, y) in enumerate(test_loader):
        inputs, y = inputs.to(device, dtype=torch.float), y.to(device, dtype=torch.float)
        y_hat = model(inputs)
        loss = criterion(y_hat, y)
#             valid_loss = valid_loss + ((1/(batch_idx+1)) * (loss.data - valid_loss))
        valid_loss += loss.item() * inputs.size(0)
        print("=", end='')
    return valid_loss

학습

required libraries

import torch
from torch import nn, optim
from sklearn.model_selection import train_test_split, KFold
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler

basic settings

dataset = MyDataset()  # load dataset
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # set device

# set loss functions
criterion1 = nn.L1Loss()
criterion2 = nn.MSELoss()

K-fold Cross Validation (k=10)

# source reference: https://medium.com/dataseries/k-fold-cross-validation-with-pytorch-and-sklearn-d094aa00105f
k = 10
splits = KFold(n_splits=k, shuffle=True, random_state=42)
foldperf = {}

for fold, (train_idx, val_idx) in enumerate(splits.split(np.arange(len(dataset)))):
    print('Fold {}'.format(fold+1))
    
    train_sampler = SubsetRandomSampler(train_idx)
    test_sampler = SubsetRandomSampler(val_idx)
    
    train_loader = DataLoader(dataset, batch_size=2, sampler=train_sampler)
    test_loader = DataLoader(dataset, batch_size=2, sampler=test_sampler)
    
    # model = resnet(50, in_channels=1, num_classes=5)  # model_depth = [10, 18, 34, 50, 101, 152, 200]
    # model = inception_v4(num_classes=5, in_channels=1)
    model = inception_resnet_v2(num_classes=5, in_channels=1)
    # model = densenet(121, in_channels=1, num_classes=5)  # model_depth = [121, 169, 201, 264]

    model.load_state_dict(torch.load('{}.pth'.format(type(model).__name__)))
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001) ##1e-3
    
    model = model.float()
    history = {'train_mae':[], 'test_mae':[], 'train_mse':[]}
    
    for epoch in range(num_epochs):
        train_loss = train_epoch(model, criterion1, criterion2, optimizer, train_loader)
        test_loss = valid_epoch(model, criterion1, test_loader)
        
        train_mae = train_loss[0] / len(train_loader.sampler)
        train_mse = train_loss[1] / len(train_loader.sampler)
        test_mae = test_loss / len(test_loader.sampler)
        
        print("\nEpoch:{}/{} AVG Training MAE Loss:{:.3f} AVG Test MAE Loss:{:.3f}".format(epoch+1,
                                                                                num_epochs,
                                                                                train_mae,
                                                                                test_mae))
        print(" AVG Training MSE Loss:{:.3f}".format(train_mse))
        
        history['train_mae'].append(train_mae)
        history['test_mae'].append(test_mae)
        history['train_mse'].append(train_mse)
    print()
    foldperf['fold{}'.format(fold+1)] = history

without Cross Validation

# split data to train and validation sets
train, test = train_test_split(dataset, test_size=test_size)  # test_size default 0.25

train_dl = DataLoader(train, batch_size=batch_size, shuffle=True)
test_dl = DataLoader(test, batch_size=batch_size, shuffle=False)

losses = {'train_mae': [], 'test_mae': [], 'train_mse': []}

model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

model.float()

for epoch in range(num_epochs):
    train_loss = train_epoch(model, criterion1, criterion2, optimizer, train_dl)
    test_loss = valid_epoch(model, criterion1, test_dl)
    
    train_mae = train_loss[0] / len(train_dl)
    train_mse = train_loss[1] / len(train_dl)
    test_mae = test_loss / len(test_dl)
    
    print("\nEpoch:{}/{} AVG Training MAE Loss:{:.3f} AVG Test MAE Loss:{:.3f}".format(epoch+1,
                                                                                num_epochs,
                                                                                train_mae,
                                                                                test_mae))
    print(" AVG Training MSE Loss:{:.3f}".format(train_mse))
        
    losses['train_mae'].append(train_mae)
    losses['test_mae'].append(test_mae)
    losses['train_mse'].append(train_mse)

Re-train

  1. retrain method
from torch.autograd import Variable
def retrain(trainloader, model, epoch, criterion, optimizer, use_cuda=True):
    model.train()
    total = 0, 0
    loss_sum = 0
    for batch_idx, (data, target) in enumerate(trainloader):
        if use_cuda:
            data, target = data.float(), target.float()
        data, target = Variable(data), Variable(target)
        optimizer.zero_grad()
        output = model(data)

        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        loss_sum += loss.item()

        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.3f}'.format(
                epoch, batch_idx * len(data), len(trainloader.dataset),
                100. * batch_idx / len(trainloader), loss.item()))

    loss_avg = loss_sum / len(trainloader.dataset)
    print()
    print('Train Epoch: {}\tAverage Loss: {:.3f}'.format(epoch, loss_avg))
retrain(train_loader, model, 2, criterion, optimizer)
  1. w/o method
test_model.cuda()
total_step = len(train_loader)
loss_list = []
num_epochs = 2

for epoch in range(num_epochs):
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device, dtype=torch.float), labels.to(device, torch.float)
        
        # Run the forward pass
        outputs = test_model(inputs)
        print(outputs.shape)
        loss = criterion(outputs, labels)
        loss_list.append(loss.item())
        
        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print("\nEpoch [{} / {}], Step [{} / {}], Loss: {:4f}\n".format(epoch+1, num_epochs, i+1, total_step, loss.item()))

데이터 시각화

loss plot

def plot_loss(train_losses, valid_losses, title='model loss'):
    plt.plot(train_losses)
    plt.plot(valid_losses)
    plt.title(title)
    plt.ylabel('loss'); plt.xlabel('epoch')
    if valid_losses==[]:  # if there are only train losses
        plt.show()
    else:
        plt.legend(['train', 'val'], loc='upper left')
        plt.show()

prediction and answer plot

plt.figure(figsize=(15, 15))  # configure figure size

# get prediction
y = {'label': [], 'pred':[]}
for i in range(len(test)):
    image = np.zeros((1, 1, 232, 224, 224))
    image[0, :, :, :, :] = test[i][0]
    x = torch.cuda.FloatTensor(image)
    pred = model(x)
    pred = pred.detach().cpu().numpy()
    y['label'].append(denormalize(test[i][1][0], data[data.columns[0]]))
    y['pred'].append(denormalize(pred[0][0], data[data.columns[0]]))

## for multiple labels
# for i in range(5):
#     plt.subplot(3, 2, i+1)

# linear y=x
x_new = np.linspace(0, max(y['pred']))
y_new = x_new
plt.plot(x_new, y_new, c='r')

# scatter plot predictions
plt.scatter(y['label'], y['pred'], c='b')
plt.axis('square')  # prettier
plt.ylabel('prediction'); plt.xlabel('ground truth')  # set axis name

plt.tight_layout()  # prettier
# set x, y limits if needed
plt.xlim([0,1])
plt.ylim([0,1])

plt.show()

Pretrained Model

내 학습 모델 .pth 파일로 저장

torch.save(model.state_dict(), '{}.pth'.format(type(model).__name__)) # save model as class name

.pth 파일 불러오기

model = MyModel(in_channels, num_classes)
model.load_state_dict(torch.load('{}.pth'.format(type(model).__name__)) # ...because I saved my model as class name
# parameters like in_channels, num_classes must match

to change num of classes (ex. inception-resnet)

_model = inception_resnet_v2()  # default
_model.load_state_dict(torch.load('InceptionResnetV2.pth'))
num_ftrs = _model.fc.in_features
_model.fc = nn.Linear(num_ftrs, new_num_channels)
model = _model

에러 잡기

Error(s) in loading state_dict

Missing key(s) in state_dict & Unexpected key(s) in state_dict

model.load_state_dict(pretrained, strict=False)  # strict: False로 해주기

Figure and Table

model summary

from models.custom_net import *
import numpy as np
import os
from torchsummary import summary
model = CustomNet(classes=1)
img = np.load('img_npy/' + os.listdir('img_npy/')[0])
shape = [1]
shape += list(img.shape)
summary(model.cuda(), tuple(shape)) #####

figure number

i = 0
plt.figure(figsize=(16,8))
for key, value in samples.items():
    filename = img_nifti + value.index[0] + '/'
    filename = filename + os.listdir(filename)[0]
    
    img = nib.load(filename)
    img = img.get_fdata()
        
    mip = np.max(img, axis=1)
    mip = zoom(mip, (1,512/mip.shape[1]))
    mip = rotate(mip, 90)

    plt.subplot(2,4,i+1)
    plt.text(10,10, chr(ord('A')+i),
           horizontalalignment='left',
           verticalalignment='top',
           bbox=dict(facecolor='white', alpha=0.6),
           fontsize=12.5)
    plt.imshow(mip, cmap='gray')
    plt.axis('off')
    
    plt.subplot(2,4,i+5)
    plt.text(10,10, chr(ord('A')+i),
       horizontalalignment='left',
       verticalalignment='top',
       bbox=dict(facecolor='white', alpha=0.6),
       fontsize=12.5)
    axial = img[:, :, img.shape[2]//2]
    axial = rotate(axial, 90)
    plt.imshow(axial, cmap='gray')
    plt.axis('off')
    i+=1
plt.tight_layout()
plt.show()
plt.close()

기타 잡다한 코드

cuda 캐시 비우기

# CUDA out of memory
import gc
gc.collect()
torch.cuda.empty_cache()

총 걸린 시간 계산

import time
start = time.time()
##
end = time.time()
exec_time = end - start  # return seconds

Numpy 배열 관련

배열 순서 바꾸기

ex. NHWC에서 pytorch에서 지원하는 NCHW 형태로 바꾸고 싶을 때

# if x.shape is (N, H, W, C)
x = x.transpose(0,3,1,2) # 또는
x = np.transpose(x[:,:,:,:], (0,3,1,2))

파일로 저장

x = np.array([[1,2], [3,4]])
np.save(FILENAME, x)

배열 파일 불러오기

x = np.load('{FILENAME}.npy')

Pandas Dataframe 관련

파일 불러오기

# csv 파일의 경우
df = pd.read_csv(FILENAME)

# 불러올 때 index 지정
df = pd.read_csv(FILENAME, index_col=COLUMN_NUMBER)

행 index, 열 index 이름 보기

# rows
df.index

# columns
df.columns

한 컬럼 기준 크기순 정렬

df_sorted = df.sort_values(COLUMNNAME)

컬럼 선택

# single column
df_col = df[COLUMNNAME]

# multiple columns
useful_columns = ['col1', 'col2', 'col3']
df_col = df[useful_columns]

loc, iloc

useful_columns = ['col1', 'col2', 'col3']
df_col = df.loc[:, useful_columns]

useful_rows = ['row1', 'row2', 'row3']
df_row = df.loc[useful_rows, :] # OR
df_row = df.loc[useful_rows]

df.iloc[ROW_NUM] # 행을 번호로 선택
df.iloc[:, COL_NUM] # 열을 번호로 선택

df[df['name'] == 'HJ'] # logical index

행, 열 이름 바꾸기

# 행
df = df.rename(index={'old row name': 'new row name'})
# 열
df = df.rename(columns={'old column name': 'new column name'})

# 또는
df.rename(columns=COLUMNS, index=INDEX, inplace=True)

새 파일로 저장

df.to_csv(FILENAME)
# index column에 이름을 주고 싶을 때
df.to_csv(FILENAME, index_label=COLUMNNAME)
# 자동 index 생성 방지
df.to_csv(FILENAME, index=False)

행 추가

df = df.append(NEW_ROW, ignore_index=True) 

ignore_index=True로 하면 원래 dataframe에서 새로운 index를 할당해줌.

열 추가

df[COLUMN_NAME] = NEW_COLUMN

One-Hot Encoding

from sklearn.preprocessing import OneHotEncoder
class_names = [[classes[0]], [classes[1]], [classes[2]]]
ohe = OneHotEncoder(sparse=False)
ohe.fit(class_names)
# check
classes_oh = ohe.transform(class_names)
print(classes_oh)