어떠한 우연한 기회를 통해 NSMC 데이터를 각자 이진 분류 모델을 개발해 추론 자동화하는 시스템을 작동시켰는데,,
생각해보니 지금까지의 난 걍 텐서플로우로 구현되어있던 모델만 보거나,
오픈소스 쓰거나 (KLUE, HuggingFace) 같은,,
정말 제대로 내가 모델 개발해본 적이 있나,, 싶어서 혼자 한 번 LSTM 모델을 만들어보고 싶었다.
그래서 구현해 본 모델을 기록해보고 싶어서 쓴다. 설명 시작!
1. 라이브러리 임포트
# 패키지 임포트
import os
import random
import numpy as np
import torch
import tensorflow as tf
import transformers
import gdown
# ...
2. 무작위 통제
# 무작위성 통제
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
# ...
3. 모델 제작
## 모델 학습
import pandas as pd
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from torch.utils.data import DataLoader
## gpu 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
## 토크나이저 정의
tokenizer = AutoTokenizer.from_pretrained('klue/bert-base')
## 처음 본 구글 드라이브 다운 라이브러리,, gdown
train_data = gdown.download("https://drive.google.com/file/d/1w1c2-COn89rxOVpyQSmctROSGP0aqadD/view?usp=share_link", fuzzy=True)
train_data = pd.read_csv(train_data)
# train_data = train_data.loc[:]
## preprocessing 코드
def preprocess(text):
tokens = tokenizer.tokenize(text)
max_length = 64
input_ids = torch.zeros([max_length], dtype=torch.int32)
input_token = tokenizer.convert_tokens_to_ids(tokens)
if len(input_token) > max_length:
input_token = input_token[:max_length]
input_ids[:len(input_token)] = torch.tensor(input_token, dtype=torch.int32)
return input_ids
seq_length = 64 ## 문장 길이 제한
input_ids = torch.zeros([len(train_data), seq_length], dtype=torch.int32).to(device)
labels = torch.zeros([len(train_data)], dtype=torch.int8).to(device)
## input_ids, labels 에 넣기
for i in range(len(train_data)):
if pd.isna(train_data['document'][i]): continue
input_ids[i] = preprocess(train_data['document'][i])
labels[i] = train_data['label'][i]
## Define the LSTM classification model
class LSTMClassificationModel(nn.Module):
## 우선 당연히 초기화 함수 쓰는데, 주는 인자는
## nn.embedding을 쓰기 위해 vocab_size/
## embedding 차원/
## hidden 차원(LSTM 레이어 간 넘기는)/
## output 차원/
## LSTM 레이어 개수
## 드롭아웃
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, dropout=dropout)
self.fc = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
## 그 다음 포워딩은, input_ids만 주면 되는데
def forward(self, input_ids):
## 임베딩 -> lstm 모델에 넣기 ->
embedded = self.dropout(self.embedding(input_ids))
packed_output, (hidden, cell) = self.lstm(embedded)
# [batch, seq_length, layer_direction(=1) * hidden_size] /
# [n_layers * layer_direction(=1), seq_length, hidden_size] /
# [n_layers * layer_direction(=1), seq_length, hidden_size]
hidden = self.dropout(packed_output[:, -1, :])
output = self.fc(hidden)
return torch.sigmoid(output)
# Hyperparameters
vocab_size = len(tokenizer.vocab)
embedding_dim = 256
hidden_dim = 128
output_dim = 1
n_layers = 3
dropout = 0.15
# Instantiate the LSTM classification model
classification_model = LSTMClassificationModel(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout)
classification_model = classification_model.to(device)
## 이진 분류를 위한 BCE 사용하고, 옵티마이저는 Adam이고 학습률은 2e-5
criterion = nn.BCELoss().to(device)
optimizer = torch.optim.Adam(classification_model.parameters(), lr=2e-5)
## 학습 코드
def train_model(model, dataloader):
model.train()
loss = 0.0
for batch in dataloader:
input_ids = batch['input_ids'].to(device)
labels = batch['labels'].to(device)
optimizer.zero_grad()
outputs = model(input_ids)
loss = criterion(outputs, labels.reshape(-1, 1).float())
loss.backward()
optimizer.step()
return loss
## 정의한 데이터셋 클래스!
class TrainDataset(torch.utils.data.Dataset):
def __init__(self, input_ids, labels):
self.input_ids = input_ids
self.labels = labels
def __getitem__(self, idx):
return {'input_ids': self.input_ids[idx].to(device),
'labels': self.labels[idx].to(device)}
def __len__(self):
return len(self.input_ids)
## 내가 가장 어려워하는 배치 단위로 넣어서 학습하기,, 어려워ㅠㅠ 뭔가 데이터로더는,,
train_dataset = TrainDataset(input_ids, labels)
train_loader = DataLoader(train_dataset, batch_size = 32, shuffle=False)
for epoch in range(5):
print('epoch: ', epoch)
loss = train_model(classification_model, train_loader)
## loss 계산하려면 cpu로 내려야함
print('loss : {:.4f}'.format(loss.to('cpu').detach().numpy()))
torch.save(classification_model.state_dict(), 'classification_model.pt')
- 추가) lstm 옵션에 bidirectional=True주면 bidirectional-lstm 된다
4. 학습 후 저장하고, 평가 위한 모델 로드하기
import gdown
import torch.nn as nn
import pandas
def load_model():
class LSTMClassificationModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, dropout=dropout)
self.fc = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
self.sm = nn.Softmax(1)
def forward(self, input_ids):
embedded = self.dropout(self.embedding(input_ids))
packed_output, (hidden, cell) = self.lstm(embedded)
hidden = self.dropout(packed_output[:, -1, :])
output = self.fc(hidden)
return self.sm(output)
# Hyperparameters
tokenizer = transformers.AutoTokenizer.from_pretrained('klue/bert-base')
vocab_size = len(tokenizer.vocab)
embedding_dim = 256
hidden_dim = 128
output_dim = 1
n_layers = 3
dropout = 0.1
# Instantiate the LSTM classification model
classification_model = LSTMClassificationModel(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout)
# 모델 가중치 로드
gdown.download("https://drive.google.com/file/d/1QCvy0dQ1jDe4icOuoq40W2lKzbzXp37J/view?usp=sharing", './classification_model.pt', fuzzy=True)
device = torch.device('cpu')
classification_model.load_state_dict(torch.load('./classification_model.pt', map_location=device), strict=False)
classification_model.eval()
return classification_model
5. 평가하기
# 예측 함수는 AIF에서 제공하는 X_test의 입력 정보와, 메모리에 올라간 사전 학습 모델을 입력 받습니다.
def predict(X_test, model):
def preprocess(text):
tokenizer = transformers.AutoTokenizer.from_pretrained('klue/bert-base')
tokens = tokenizer.tokenize(text)
max_length = 64
input_ids = torch.zeros([1, max_length], dtype=torch.int32)
input_token = tokenizer.convert_tokens_to_ids(tokens)
if len(input_token) > max_length:
input_token = input_token[:max_length]
input_ids[0, :len(input_token)] = torch.tensor(input_token, dtype=torch.int32)
return input_ids
result = []
with torch.no_grad():
for X in X_test:
if pandas.isna(X): continue
input_ids = preprocess(X)
print(model(input_ids))
if model(input_ids) > 0.5:
result.append(1)
else:
result.append(0)
return result
미숙한 코드긴하지만,, 그리고 궁금한게 학습이 크게 잘 안된다 왜지?
'ML\DL > Deep Learning' 카테고리의 다른 글
[DL] End-To-End Model (0) | 2023.03.14 |
---|