TimeSeriesTransformer Modeling
# Define Time2Vec layer
class Time2Vec(nn.Module):
def __init__(self, input_dim):
super(Time2Vec, self).__init__()
self.linear = nn.Linear(input_dim, 1)
self.periodic = nn.Linear(input_dim, input_dim-1)
def forward(self, x):
linear_out = self.linear(x)
periodic_out = torch.sin(self.periodic(x))
return torch.cat([linear_out, periodic_out], dim=-1)
# Define Transformer Encoder Block
class TransformerBlock(nn.Module):
def __init__(self, input_dim, num_heads, dropout):
super(TransformerBlock, self).__init__()
self.attention = nn.MultiheadAttention(input_dim, num_heads, dropout=dropout)
self.norm1 = nn.LayerNorm(input_dim)
self.norm2 = nn.LayerNorm(input_dim)
self.ff = nn.Sequential(
nn.Linear(input_dim, 4 * input_dim),
nn.GELU(),
nn.Linear(4 * input_dim, input_dim),
nn.Dropout(dropout)
)
def forward(self, x):
attended, _ = self.attention(x, x, x)
x = self.norm1(attended + x)
feedforward = self.ff(x)
return self.norm2(feedforward + x)
# Define main model architecture
class TimeSeriesTransformer(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_heads, output_size, dropout):
super(TimeSeriesTransformer, self).__init__()
self.time2vec = Time2Vec(input_size)
self.embedding = nn.Linear(input_size, hidden_size)
self.position_encoding = self.generate_position_encoding(hidden_size, 10)
self.dropout = nn.Dropout(dropout)
self.transformer_blocks = nn.ModuleList([
TransformerBlock(hidden_size, num_heads, dropout)
for _ in range(num_layers)
])
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, output_size)
)
def generate_position_encoding(self, hidden_size, max_len):
pe = torch.zeros(max_len, hidden_size)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return pe.unsqueeze(0)
def forward(self, x):
b, s, f = x.shape
x = self.time2vec(x)
x = self.embedding(x)
x = x + self.position_encoding[:, :s, :].to(x.device)
x = self.dropout(x)
for transformer in self.transformer_blocks:
x = transformer(x)
x = x.mean(dim=1)
return self.output_layer(x)
# Define custom dataset class
class AgriculturePriceDataset(Dataset):
def __init__(self, dataframe, window_size=9, prediction_length=3, is_test=False):
self.data = dataframe
self.window_size = window_size
self.prediction_length = prediction_length
self.is_test = is_test
self.price_column = [col for col in self.data.columns if '평균가격(원)' in col and len(col.split('_')) == 1][0]
self.numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()
self.sequences = []
if not self.is_test:
for i in range(len(self.data) - self.window_size - self.prediction_length + 1):
x = self.data[self.numeric_columns].iloc[i:i+self.window_size].values
y = self.data[self.price_column].iloc[i+self.window_size:i+self.window_size+self.prediction_length].values
self.sequences.append((x, y))
else:
self.sequences = [self.data[self.numeric_columns].values]
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
if not self.is_test:
x, y = self.sequences[idx]
return torch.FloatTensor(x), torch.FloatTensor(y)
else:
return torch.FloatTensor(self.sequences[idx])
# Training function with mixed precision training
def train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device):
model.train()
total_loss = 0
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
scheduler.step()
total_loss += loss.item()
return total_loss / len(train_loader)
# Evaluation function
def evaluate_model(model, test_loader, criterion, device):
model.eval()
total_loss = 0
with torch.no_grad():
for batch_x, batch_y in test_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
total_loss += loss.item()
return total_loss / len(test_loader)
# Main training loop
def train_and_predict(품목_리스트):
품목별_predictions = {}
품목별_scalers = {}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pbar_outer = tqdm(품목_리스트, desc="품목 처리 중", position=0)
for 품목명 in pbar_outer:
pbar_outer.set_description(f"품목별 전처리 및 모델 학습 -> {품목명}")
# Data preprocessing
train_data, scaler = process_all_items_with_full_conditions("./train/train.csv",
"./train/meta/TRAIN_산지공판장_2018-2021.csv",
"./train/meta/TRAIN_전국도매_2018-2021.csv",
품목명)
품목별_scalers[품목명] = scaler
# Dataset and DataLoader setup
dataset = AgriculturePriceDataset(train_data)
train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_data, CFG.batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_data, CFG.batch_size, shuffle=False, num_workers=4)
# Model setup
input_size = len(dataset.numeric_columns)
model = TimeSeriesTransformer(
input_size=input_size,
hidden_size=CFG.hidden_size,
num_layers=CFG.num_layers,
num_heads=CFG.num_heads,
output_size=CFG.output_size,
dropout=CFG.dropout
).to(device)
# Training setup
criterion = nn.HuberLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.learning_rate, weight_decay=CFG.weight_decay)
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
scaler = GradScaler()
# Training loop
best_val_loss = float('inf')
os.makedirs('models', exist_ok=True)
for epoch in range(CFG.epoch):
train_loss = train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device)
val_loss = evaluate_model(model, val_loader, criterion, device)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), f'models/best_model_{품목명}.pth')
print(f'Epoch {epoch+1}/{CFG.epoch}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# Inference
품목_predictions = []
pbar_inner = tqdm(range(25), desc="테스트 파일 추론 중", position=1, leave=False)
for i in pbar_inner:
test_file = f"./test/TEST_{i:02d}.csv"
산지공판장_file = f"./test/meta/TEST_산지공판장_{i:02d}.csv"
전국도매_file = f"./test/meta/TEST_전국도매_{i:02d}.csv"
test_data, _ = process_data(test_file, 산지공판장_file, 전국도매_file, 품목명, scaler=품목별_scalers[품목명])
test_dataset = AgriculturePriceDataset(test_data, is_test=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
model.eval()
predictions = []
with torch.no_grad():
for batch in test_loader:
batch = batch.to(device)
output = model(batch)
predictions.append(output.cpu().numpy())
predictions_array = np.concatenate(predictions)
# Inverse transform predictions
price_column_index = test_data.columns.get_loc(test_dataset.price_column)
predictions_reshaped = predictions_array.reshape(-1, 1)
price_scaler = MinMaxScaler()
price_scaler.min_ = 품목별_scalers[품목명].min_[price_column_index]
price_scaler.scale_ = 품목별_scalers[품목명].scale_[price_column_index]
predictions_original_scale = price_scaler.inverse_transform(predictions_reshaped)
if np.isnan(predictions_original_scale).any():
pbar_inner.set_postfix({"상태": "NaN"})
else:
pbar_inner.set_postfix({"상태": "정상"})
품목_predictions.extend(predictions_original_scale.flatten())
품목별_predictions[품목명] = 품목_predictions
pbar_outer.update(1)
return 품목별_predictions
# Run training and prediction
품목별_predictions = train_and_predict(품목_리스트)
# Prepare submission file
sample_submission = pd.read_csv('./sample_submission.csv')
for 품목명, predictions in 품목별_predictions.items():
sample_submission[품목명] = predictions
# Save results
sample_submission.to_csv('./optimized_submission.csv', index=False)
Time2Vec Layer의 개념
Time2Vec는 기본적으로 시간 데이터를 벡터로 변환하여 입력에 추가하는 레이어이다. 여기서 시간은 단순한 선형적인 변화뿐만 아니라, 주기적 패턴(예: 주기적 이벤트, 계절적 변화)을 포착하는 것이 중요하다. 따라서 Time2Vec는 선형 변환과 **주기적 변환(사인 함수)**을 모두 사용하여 시간을 벡터로 표현한다.
class Time2Vec(nn.Module):
def __init__(self, input_dim):
super(Time2Vec, self).__init__()
# 선형 변환을 위한 레이어: 시간 정보를 1차원으로 변환
self.linear = nn.Linear(input_dim, 1)
# 주기적 변환을 위한 레이어: 시간 정보를 input_dim - 1 크기의 벡터로 변환
self.periodic = nn.Linear(input_dim, input_dim - 1)
def forward(self, x):
# 선형 변환 결과 (일차적으로 시간 값을 그대로 모델에 전달)
linear_out = self.linear(x)
# 주기적 변환 결과 (사인 함수를 통해 시간의 주기성을 반영)
periodic_out = torch.sin(self.periodic(x))
# 선형 변환 결과와 주기적 변환 결과를 결합하여 최종 Time2Vec 출력 생성
return torch.cat([linear_out, periodic_out], dim=-1)
Time2Vec Layer의 역할
- Time2Vec 레이어는 시간 정보를 더 풍부하게 표현하는 역할을 한다. 시계열 데이터의 시간 정보는 단순한 숫자(예: 시간, 일, 분 등)일 수 있지만, 이러한 시간 정보는 주기적 변화(예: 하루, 일주일, 계절 등)와 관련이 있을 수 있다. Time2Vec는 이러한 주기적 패턴을 모델이 학습할 수 있도록 한다.
- 선형 변환: 시간을 단순히 선형적으로 변환하는 부분이다. 이 부분은 시간의 일차적인 흐름을 표현한다. 시간의 흐름을 그대로 모델에 전달하는 역할을 한다.
- 주기적 변환: 사인 함수를 사용하여 주기성을 반영하는 부분이다. 예를 들어, 하루의 시간대나 계절적인 변화와 같은 주기적인 특성을 포착할 수 있다. 이 부분은 주기적인 패턴을 학습하는 데 유용하다.
forward 함수 -> 모델의 "순전파" 정의
TransformerBlock
TransformerBlock은 트랜스포머 아키텍처에서 중요한 요소 중 하나로, 기본적으로 입력 시퀀스의 관계를 학습하여 중요한 정보를 추출하는 역할을 한다. 이 블록은 멀티헤드 어텐션, 레이어 정규화, 그리고 피드포워드 네트워크로 구성되어 있다. 트랜스포머는 이러한 구조를 통해 입력 데이터의 패턴을 효과적으로 학습하고, 시퀀스 내의 장기적인 의존성을 파악할 수 있다.
class TransformerBlock(nn.Module):
def __init__(self, input_dim, num_heads, dropout):
super(TransformerBlock, self).__init__()
# 멀티헤드 어텐션 모듈
self.attention = nn.MultiheadAttention(input_dim, num_heads, dropout=dropout)
# 어텐션 이후의 첫 번째 레이어 정규화
self.norm1 = nn.LayerNorm(input_dim)
# 피드포워드 네트워크 이후의 두 번째 레이어 정규화
self.norm2 = nn.LayerNorm(input_dim)
# 피드포워드 네트워크 (두 개의 선형 레이어와 활성화 함수 GELU로 구성)
self.ff = nn.Sequential(
nn.Linear(input_dim, 4 * input_dim), # 첫 번째 선형 레이어
nn.GELU(), # 활성화 함수
nn.Linear(4 * input_dim, input_dim), # 두 번째 선형 레이어
nn.Dropout(dropout) # 드롭아웃으로 과적합 방지
)
def forward(self, x):
# 1. 어텐션 레이어
# Q(쿼리), K(키), V(밸류) 모두 같은 x를 사용하여 멀티헤드 어텐션 수행
attended, _ = self.attention(x, x, x)
# 2. 어텐션 결과와 입력을 더하고, 레이어 정규화 적용
x = self.norm1(attended + x)
# 3. 피드포워드 네트워크를 통과한 결과와 다시 입력을 더하고, 레이어 정규화 적용
feedforward = self.ff(x)
return self.norm2(feedforward + x)
self.attention = nn.MultiheadAttention(input_dim, num_heads, dropout=dropout)
- 멀티헤드 어텐션은 트랜스포머 모델의 핵심 개념으로, 입력 시퀀스 내의 각 요소가 다른 요소와 어떻게 관계를 맺고 있는지 학습하는 메커니즘이다.
- 쿼리(Q), 키(K), 밸류(V) 세 가지 요소를 기반으로 어텐션 스코어를 계산하며, Q, K, V는 모두 동일한 입력 데이터(x)에서 생성된다.
- 멀티헤드 어텐션에서 **쿼리(Q)**는 "내가 다른 토큰들과 얼마나 관련이 있는가?"를 묻는 역할, **키(K)**는 "내가 그 질문과 얼마나 일치하는가?"에 대한 답변, **밸류(V)**는 그 토큰의 실제 정보를 나타낸다. 쿼리와 키를 사용해 어텐션 스코어를 계산하고, 이 스코어로 밸류를 가중합하여 최종 출력을 생성한다. 멀티헤드 어텐션은 이를 여러 번 병렬로 수행해 다양한 패턴을 학습할 수 있게 한다.
- 여러 개의 어텐션 헤드를 사용하여, 데이터 내의 다양한 관계(패턴)를 병렬로 학습할 수 있다.
- 각 헤드는 독립적으로 어텐션을 계산한 후, 이를 결합하여 최종 결과를 도출한다. 이는 다양한 각도에서 데이터 관계를 학습할 수 있도록 도와준다.
- dropout을 사용하여 모델의 과적합을 방지한다.
self.norm1 = nn.LayerNorm(input_dim)
self.norm2 = nn.LayerNorm(input_dim)
- 레이어 정규화는 각 층의 출력이 일정한 분포를 유지할 수 있도록 정규화하는 과정이다. 이는 학습을 더 안정적으로 만들어 주며, 네트워크가 더 잘 수렴하도록 돕는다.
- self.norm1은 멀티헤드 어텐션 블록에서 나온 결과와 원본 입력을 더한 값에 대해 적용된다. 이는 Residual Connection(잔차 연결) 후에 바로 적용된다.
- self.norm2는 피드포워드 네트워크를 통과한 결과에 적용된다.
self.ff = nn.Sequential(
nn.Linear(input_dim, 4 * input_dim), # 첫 번째 선형 레이어
nn.GELU(), # 활성화 함수
nn.Linear(4 * input_dim, input_dim), # 두 번째 선형 레이어
nn.Dropout(dropout) # 드롭아웃으로 과적합 방지
)
- *피드포워드 네트워크(Feedforward Neural Network, FFN)**는 트랜스포머 모델에서 입력 데이터가 한 방향으로만 흐르며 레이어를 통과해 최종 출력을 계산하는 네트워크
- 피드포워드 네트워크는 두 개의 선형 레이어로 구성되어 있으며, 중간에 GELU라는 활성화 함수가 사용된다.
- 첫 번째 선형 레이어는 입력 차원(input_dim)을 4배로 확장한 후, GELU 활성화 함수를 거친다.
- 두 번째 선형 레이어는 다시 원래의 차원(input_dim)으로 줄어들게 된다.
- 이 과정은 입력 데이터에 대해 비선형 변환을 적용하여, 더 복잡한 패턴을 학습할 수 있도록 돕는다.
- Dropout은 네트워크가 과적합되지 않도록 일부 뉴런을 무작위로 학습하지 않도록 하는 정규화 기법이다.
TimeSeriesTransformer
TimeSeriesTransformer는 시계열 데이터를 처리하여 미래의 가격을 예측하는 모델로, Time2Vec 레이어, 트랜스포머 블록들, 그리고 **위치 인코딩(Position Encoding)**을 사용하여 모델이 시간의 흐름과 시퀀스 내 패턴을 학습하도록 설계되어 있다. 이 모델은 주로 금융 데이터나 농산물 가격 예측과 같은 시계열 데이터에 유용하다.
class TimeSeriesTransformer(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_heads, output_size, dropout):
super(TimeSeriesTransformer, self).__init__()
# Time2Vec 레이어
self.time2vec = Time2Vec(input_size)
# 입력 데이터를 임베딩(차원 축소)하는 선형 레이어
self.embedding = nn.Linear(input_size, hidden_size)
# 위치 인코딩을 생성하는 함수 호출
self.position_encoding = self.generate_position_encoding(hidden_size, 10)
# 드롭아웃 레이어 (과적합 방지)
self.dropout = nn.Dropout(dropout)
# 트랜스포머 블록 여러 개 쌓기 (ModuleList를 사용하여 리스트로 생성)
self.transformer_blocks = nn.ModuleList([
TransformerBlock(hidden_size, num_heads, dropout)
for _ in range(num_layers)
])
# 출력층: 예측을 위한 선형 레이어와 활성화 함수
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, output_size)
)
def generate_position_encoding(self, hidden_size, max_len):
# 위치 인코딩 생성
pe = torch.zeros(max_len, hidden_size)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return pe.unsqueeze(0)
def forward(self, x):
# 입력 데이터의 배치 크기(b), 시퀀스 길이(s), 특성 수(f)를 추출
b, s, f = x.shape
# 1. Time2Vec 레이어로 시간 정보를 추가한 벡터 생성
x = self.time2vec(x)
# 2. 입력 데이터를 임베딩 (차원 축소)
x = self.embedding(x)
# 3. 위치 인코딩을 더해줌으로써 시계열 데이터의 순서 정보를 추가
x = x + self.position_encoding[:, :s, :].to(x.device)
# 4. 드롭아웃을 적용하여 과적합 방지
x = self.dropout(x)
# 5. 트랜스포머 블록을 여러 번 거침
for transformer in self.transformer_blocks:
x = transformer(x)
# 6. 시퀀스의 평균값을 계산하여 하나의 벡터로 변환 (차원 축소)
x = x.mean(dim=1)
# 7. 최종 출력 레이어를 통해 예측값 도출
return self.output_layer(x)
주요 구성 요소 설명
- Time2Vec 레이어
- 시계열 데이터에서 시간을 벡터로 변환하는 역할을 한다.
- 선형 변환과 주기적 변환(사인 함수)을 사용하여 시간 정보를 더 효과적으로 모델에 반영한다.
self.time2vec = Time2Vec(input_size)
2. Embedding Layer (임베딩 레이어)
연속적인 벡터로 변환
- 입력 데이터를 차원 축소하여 모델이 더 쉽게 처리할 수 있도록 한다.
- 입력의 크기인 input_size에서 모델의 내부 크기인 hidden_size로 변환하는 역할을 한다.
self.embedding = nn.Linear(input_size, hidden_size)
3.Position Encoding (위치 인코딩)
- 시계열 데이터에서 각 데이터 포인트가 시간적으로 어디에 위치해 있는지를 모델이 인식할 수 있도록 돕는 기법이다.
- generate_position_encoding 함수는 **사인(sin)**과 코사인(cos) 함수를 사용하여 위치 인코딩을 계산한다.
def generate_position_encoding(self, hidden_size, max_len):
pe = torch.zeros(max_len, hidden_size)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return pe.unsqueeze(0)
여기서 position은 시퀀스 내의 각 요소의 위치이며, 이를 사인과 코사인으로 변환해 위치 정보를 벡터화한다. 이 정보를 모델에 더해줌으로써 모델이 시퀀스의 순서 정보를 학습할 수 있다.
- 이렇게 계산된 위치 인코딩은 입력 데이터 x에 더해진다:
div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
*위치 인코딩(position encoding)**을 생성하는 과정에서, 각 위치에 고유한 값을 부여하기 위해 사용되는 수식이다.
x = x + self.position_encoding[:, :s, :].to(x.device)
4. Transformer Blocks (트랜스포머 블록)
- 여러 개의 TransformerBlock을 쌓아서, 시퀀스 내 패턴을 학습한다. 각 블록은 멀티헤드 어텐션과 피드포워드 네트워크로 구성되어 있으며, 입력 데이터를 반복적으로 처리한다.
- ModuleList를 사용하여 여러 개의 트랜스포머 블록을 정의한다.
self.transformer_blocks = nn.ModuleList([
TransformerBlock(hidden_size, num_heads, dropout)
for _ in range(num_layers)
])
forward 함수에서는 각 트랜스포머 블록을 반복적으로 적용하여 입력 데이터를 처리한다:
for transformer in self.transformer_blocks:
x = transformer(x)
5. Global Average Pooling (시퀀스의 평균 계산)
- 트랜스포머 블록을 거친 후, 각 시퀀스의 요소들에 대해 평균을 계산하여 하나의 벡터로 변환한다. 이는 시퀀스의 중요한 정보를 하나의 벡터에 모으는 역할을 한다.
x = x.mean(dim=1)
6. Output Layer (출력 레이어)
- 최종 예측을 위한 출력층으로, ReLU 활성화 함수와 드롭아웃을 거친 후, 최종적으로 output_size 크기의 예측 값을 도출한다.
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, output_size)
)
AgriculturePriceDataset (데이터 분할)
AgriculturePriceDataset 클래스는 농산물 가격 예측을 위한 데이터를 생성하고, 훈련 데이터와 테스트 데이터를 다르게 처리하여 모델에 입력할 수 있도록 하는 사용자 정의 데이터셋 클래스이다. 주어진 시계열 데이터를 일정한 크기(윈도우 크기)로 나누어 모델이 학습할 수 있도록 준비하며, 예측하려는 목표 값(타겟 값)도 함께 제공한다.
class AgriculturePriceDataset(Dataset):
def __init__(self, dataframe, window_size=9, prediction_length=3, is_test=False):
# 데이터프레임과 윈도우 크기, 예측 길이, 테스트 모드 여부를 받아 초기화
self.data = dataframe
self.window_size = window_size
self.prediction_length = prediction_length
self.is_test = is_test
# 가격 정보가 포함된 컬럼 찾기
self.price_column = [col for col in self.data.columns if '평균가격(원)' in col and len(col.split('_')) == 1][0]
# 숫자형 데이터를 포함한 모든 열을 리스트로 저장
self.numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()
# 데이터셋 생성 (훈련용 또는 테스트용 데이터에 따라 다르게 처리)
self.sequences = []
if not self.is_test:
# 훈련 데이터일 경우
for i in range(len(self.data) - self.window_size - self.prediction_length + 1):
x = self.data[self.numeric_columns].iloc[i:i+self.window_size].values # 입력 데이터 (x)
y = self.data[self.price_column].iloc[i+self.window_size:i+self.window_size+self.prediction_length].values # 예측할 타겟 값 (y)
self.sequences.append((x, y))
else:
# 테스트 데이터일 경우 전체 데이터를 시퀀스로 처리
self.sequences = [self.data[self.numeric_columns].values]
def __len__(self):
# 데이터셋의 길이를 반환 (훈련 데이터의 경우 시퀀스의 개수, 테스트 데이터는 하나의 시퀀스)
return len(self.sequences)
def __getitem__(self, idx):
# 데이터셋에서 인덱스를 기준으로 샘플을 반환
if not self.is_test:
x, y = self.sequences[idx]
return torch.FloatTensor(x), torch.FloatTensor(y)
else:
return torch.FloatTensor(self.sequences[idx])
주요 구성 요소 설명
1. __init__ 메서드 (초기화)
def __init__(self, dataframe, window_size=9, prediction_length=3, is_test=False):
# 데이터프레임과 윈도우 크기, 예측 길이, 테스트 모드 여부를 받아 초기화
self.data = dataframe
self.window_size = window_size
self.prediction_length = prediction_length
self.is_test = is_test
- dataframe: 학습 또는 테스트에 사용할 데이터가 포함된 pandas 데이터프레임이다. 이 데이터프레임에는 다양한 특성과 농산물의 가격 데이터가 포함되어 있다.
- window_size: 입력 데이터의 시퀀스 길이를 나타낸다. 예를 들어, window_size=9이면 과거 9일치 데이터를 입력으로 사용하여 이후 며칠간의 가격을 예측한다.
- prediction_length: 예측할 타겟 값(미래의 가격)의 길이를 의미한다. 예를 들어, prediction_length=3이면 3일 후의 가격을 예측한다.
- is_test: 테스트 데이터인지 여부를 나타내는 플래그다. 테스트 데이터일 경우, 타겟 값이 없기 때문에 시퀀스만 생성한다.
self.price_column = [col for col in self.data.columns if '평균가격(원)' in col and len(col.split('_')) == 1][0]
self.numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()
- price_column: 데이터프레임에서 평균 가격 정보를 포함하는 컬럼을 자동으로 찾아 설정한다. 이 컬럼이 타겟 값이 된다.
- numeric_columns: 데이터프레임에서 숫자형 데이터를 포함하는 모든 열을 리스트로 저장한다. 이 열들이 모델의 입력으로 사용된다.
# 데이터셋 생성 (훈련용 또는 테스트용 데이터에 따라 다르게 처리)
self.sequences = []
if not self.is_test:
# 훈련 데이터일 경우
for i in range(len(self.data) - self.window_size - self.prediction_length + 1):
x = self.data[self.numeric_columns].iloc[i:i+self.window_size].values # 입력 데이터 (x)
y = self.data[self.price_column].iloc[i+self.window_size:i+self.window_size+self.prediction_length].values # 예측할 타겟 값 (y)
self.sequences.append((x, y))
- 입력 데이터(x): window_size 크기만큼 데이터를 슬라이딩 윈도우 방식으로 자른다. 각 시퀀스는 self.numeric_columns에 포함된 숫자형 데이터를 사용하여 입력으로 처리된다.
- 타겟 값(y): prediction_length 크기만큼 이후의 가격 데이터를 타겟 값으로 설정한다.
- 이 과정을 통해 훈련 데이터는 입력과 타겟 값이 쌍으로 저장된 시퀀스로 구성된다.
else:
# 테스트 데이터일 경우 전체 데이터를 시퀀스로 처리
self.sequences = [self.data[self.numeric_columns].values]
테스트 데이터일 경우 타겟 값은 필요하지 않기 때문에, 전체 데이터를 시퀀스로 저장한다. 여기서는 테스트 데이터를 예측하기 위한 입력만 준비한다.
def __len__(self):
# 데이터셋의 길이를 반환 (훈련 데이터의 경우 시퀀스의 개수, 테스트 데이터는 하나의 시퀀스)
return len(self.sequences)
데이터셋의 길이를 반환한다. 훈련 데이터일 경우, 생성된 시퀀스의 개수를 반환하며, 테스트 데이터는 하나의 시퀀스만 존재할 수 있다.
def __getitem__(self, idx):
if not self.is_test:
x, y = self.sequences[idx]
return torch.FloatTensor(x), torch.FloatTensor(y)
else:
return torch.FloatTensor(self.sequences[idx])
- 데이터셋에서 주어진 인덱스 idx에 해당하는 데이터를 반환한다.
- 훈련 데이터: 입력 시퀀스 x와 타겟 값 y를 반환한다.
- 테스트 데이터: 타겟 값 없이 입력 시퀀스 x만 반환한다.
- 반환되는 값은 torch.FloatTensor로 변환되어 PyTorch 모델에서 사용할 수 있는 형태로 제공된다.
요약
- **AgriculturePriceDataset**은 주어진 데이터에서 일정한 기간 동안의 입력과 그 이후의 타겟 값을 쌍으로 만들어 모델 학습에 필요한 데이터를 제공한다.
- 훈련 데이터와 테스트 데이터를 다르게 처리하며, 훈련 데이터의 경우 입력과 타겟을, 테스트 데이터의 경우 입력만을 반환한다.
- 이를 통해 시계열 데이터를 모델에 효율적으로 제공할 수 있으며, 특히 농산물 가격 예측과 같은 시계열 예측 문제에서 사용된다.
5. Training and Evaluation
train_model 함수는 모델을 훈련시키고, **혼합 정밀도 학습(Mixed Precision Training)**을 사용하여 메모리 사용량을 최적화하고 훈련 속도를 향상시킨다.
evaluate_model 함수는 검증 데이터셋을 사용하여 모델의 성능을 평가하는 역할을 한다.
train_model 함수
이 함수는 모델을 학습시키기 위해 배치 단위로 데이터를 받아, 모델의 예측 결과와 실제 값의 차이를 기반으로 손실을 계산한 후, 이를 바탕으로 모델의 가중치를 업데이트하는 과정이다.
# Training function with mixed precision training
def train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device):
model.train() # 모델을 학습 모드로 설정
total_loss = 0 # 총 손실 초기화
# 미니배치 학습
for batch_x, batch_y in train_loader:
# 배치를 GPU 또는 CPU로 이동
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
# 옵티마이저의 기울기 초기화
optimizer.zero_grad()
# 혼합 정밀도 학습을 위한 autocast 사용
with autocast():
# 모델을 통해 예측값 생성
outputs = model(batch_x)
# 예측값과 실제 값의 손실 계산
loss = criterion(outputs, batch_y)
# 손실을 역전파 (backpropagation) 하여 모델의 가중치 업데이트
scaler.scale(loss).backward() # 손실 값에 대한 스케일링 적용
scaler.step(optimizer) # 스케일링된 손실 값으로 옵티마이저를 사용해 가중치 업데이트
scaler.update() # 스케일러 업데이트
# 스케줄러를 사용해 학습률 조정
scheduler.step()
# 배치의 손실을 총 손실에 더함
total_loss += loss.item()
# 평균 손실 반환
return total_loss / len(train_loader)
설명
- model.train():
모델을 학습 모드로 전환한다. 이 설정은 드롭아웃과 배치 정규화 등의 레이어가 학습 모드에서 동작하도록 한다. - 데이터 로드 및 장치로 이동:
미니배치 단위로 데이터를 train_loader에서 받아, device(GPU 또는 CPU)로 이동시킨다.
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
3.혼합 정밀도 학습 (autocast):
**autocast()**는 혼합 정밀도 학습을 가능하게 하는 컨텍스트 매니저이다. 이를 통해 일부 연산이 FP16(반 정밀도)로 처리되어 메모리 효율성과 학습 속도가 향상된다.
with autocast():
outputs = model(batch_x) # 모델 예측
loss = criterion(outputs, batch_y) # 손실 계산
4.손실 역전파 및 옵티마이저 업데이트:
scaler.scale(loss)는 손실 값을 스케일링하여, 낮은 정밀도의 연산에서 발생할 수 있는 수치 불안정을 방지한다. 이후 backward()를 통해 손실을 역전파하고, 옵티마이저로 가중치를 업데이트한다.
손실역전파 : 딥러닝 모델에서 손실 함수를 기준으로 가중치를 업데이트하기 위해 사용하는 알고리즘
scaler.scale(loss).backward() # 손실 역전파
scaler.step(optimizer) # 옵티마이저 업데이트
scaler.update() # 스케일러 업데이트
학습률 스케줄링:
스케줄러는 학습률을 동적으로 조정하는 역할을 한다. 여기서는 CosineAnnealingWarmRestarts 같은 스케줄러를 사용해 학습률을 조정하며, 매 미니배치마다 학습률을 업데이트한다.
스케줄러 : 딥러닝 모델을 학습할 때 **학습률(learning rate)**을 동적으로 조절하는 방법을 말한다. 학습률은 가중치 업데이트의 크기를 결정하는 중요한 하이퍼파라미터로, 학습이 진행됨에 따라 학습률을 변화시키면 더 좋은 성능을 얻을 수 있다. 스케줄러는 학습 과정 중에 학습률을 어떻게 변화시킬지를 제어하는 역할
scheduler.step()
손실 계산:
각 미니배치의 손실을 누적하여, 총 손실을 계산하고 평균 손실을 반환한다.
return total_loss / len(train_loader)
evaluate_model 함수
evaluate_model 함수는 학습된 모델이 검증 데이터셋에서 어떻게 성능을 발휘하는지 평가하는 함수다. 이 함수는 모델을 평가 모드로 전환하고, 검증 데이터셋에서 예측을 수행한 뒤 손실 값을 계산한다.
# Evaluation function
def evaluate_model(model, test_loader, criterion, device):
model.eval() # 모델을 평가 모드로 설정
total_loss = 0 # 총 손실 초기화
with torch.no_grad(): # 평가에서는 기울기 계산을 하지 않음
for batch_x, batch_y in test_loader:
# 배치를 GPU 또는 CPU로 이동
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
# 모델을 통해 예측값 생성
outputs = model(batch_x)
# 예측값과 실제 값의 손실 계산
loss = criterion(outputs, batch_y)
# 배치의 손실을 총 손실에 더함
total_loss += loss.item()
# 평균 손실 반환
return total_loss / len(test_loader)
설명
- model.eval():
모델을 평가 모드로 전환한다. 이 모드는 드롭아웃과 배치 정규화 레이어가 평가 모드에서 동작하도록 하며, 학습 시에만 필요한 기울기 계산을 하지 않는다. - 기울기 비활성화 (torch.no_grad()):
평가 과정에서는 기울기 계산이 필요 없으므로 **torch.no_grad()**를 사용하여 메모리 사용량을 줄이고, 계산 속도를 높인다.
with torch.no_grad():
데이터 로드 및 장치로 이동:
검증 데이터(test_loader)에서 배치를 로드하여, device로 이동시킨다.
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
모델 예측 및 손실 계산:
모델의 예측 값을 생성한 후, 실제 값과의 손실을 계산하여 누적한다.
outputs = model(batch_x) # 예측값 생성
loss = criterion(outputs, batch_y) # 손실 계산
평균 손실 계산:
각 미니배치의 손실을 모두 더한 후, 검증 데이터 전체에 대한 평균 손실을 반환한다.
return total_loss / len(test_loader)
train_and_predict 함수
train_and_predict 함수는 농산물 품목 리스트에 대해 데이터를 전처리한 후, 각 품목별로 모델을 학습시키고, 테스트 데이터를 기반으로 가격을 예측하는 전체적인 학습 및 예측 파이프라인을 정의한 함수이다. 각 품목에 대해 모델을 학습시키며, 최적의 모델을 저장하고, 이를 바탕으로 예측 값을 생성한다.
# Main training loop
def train_and_predict(품목_리스트):
품목별_predictions = {} # 각 품목에 대한 예측 결과 저장
품목별_scalers = {} # 각 품목별로 사용된 스케일러 저장
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # GPU 사용 가능 시 CUDA 사용
pbar_outer = tqdm(품목_리스트, desc="품목 처리 중", position=0)
# 품목 리스트에 대해 반복
for 품목명 in pbar_outer:
pbar_outer.set_description(f"품목별 전처리 및 모델 학습 -> {품목명}")
# 1. 데이터 전처리 (품목별로 데이터 로드 및 스케일링 처리)
train_data, scaler = process_all_items_with_full_conditions(
"./train/train.csv",
"./train/meta/TRAIN_산지공판장_2018-2021.csv",
"./train/meta/TRAIN_전국도매_2018-2021.csv",
품목명
)
품목별_scalers[품목명] = scaler # 각 품목별로 스케일러 저장
# 2. Dataset 및 DataLoader 설정
dataset = AgriculturePriceDataset(train_data) # 전처리된 데이터를 Dataset으로 변환
train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42) # 학습/검증 데이터 분리
train_loader = DataLoader(train_data, CFG.batch_size, shuffle=True, num_workers=4) # 학습 데이터 로더
val_loader = DataLoader(val_data, CFG.batch_size, shuffle=False, num_workers=4) # 검증 데이터 로더
# 3. 모델 설정
input_size = len(dataset.numeric_columns) # 입력 데이터의 크기
model = TimeSeriesTransformer(
input_size=input_size,
hidden_size=CFG.hidden_size,
num_layers=CFG.num_layers,
num_heads=CFG.num_heads,
output_size=CFG.output_size,
dropout=CFG.dropout
).to(device) # 모델을 GPU 또는 CPU로 이동
# 4. 손실 함수, 옵티마이저, 스케줄러 설정
criterion = nn.HuberLoss() # 손실 함수: HuberLoss
optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.learning_rate, weight_decay=CFG.weight_decay) # AdamW 옵티마이저
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2) # 학습률 스케줄러: CosineAnnealingWarmRestarts
scaler = GradScaler() # 혼합 정밀도 학습을 위한 GradScaler
# 5. 학습 루프
best_val_loss = float('inf') # 초기 최저 검증 손실 설정
os.makedirs('models', exist_ok=True) # 모델 저장을 위한 디렉토리 생성
for epoch in range(CFG.epoch): # 에포크 반복
train_loss = train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device) # 학습
val_loss = evaluate_model(model, val_loader, criterion, device) # 검증
# 최적의 검증 손실을 기록하고, 해당 모델 저장
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), f'models/best_model_{품목명}.pth') # 최적의 모델 저장
print(f'Epoch {epoch+1}/{CFG.epoch}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# 6. 테스트 데이터에 대한 추론 (Inference)
품목_predictions = [] # 각 품목에 대한 예측 결과 저장
pbar_inner = tqdm(range(25), desc="테스트 파일 추론 중", position=1, leave=False)
for i in pbar_inner:
# 테스트 데이터 파일 로드
test_file = f"./test/TEST_{i:02d}.csv"
산지공판장_file = f"./test/meta/TEST_산지공판장_{i:02d}.csv"
전국도매_file = f"./test/meta/TEST_전국도매_{i:02d}.csv"
# 7. 테스트 데이터 전처리
test_data, _ = process_data(test_file, 산지공판장_file, 전국도매_file, 품목명, scaler=품목별_scalers[품목명])
test_dataset = AgriculturePriceDataset(test_data, is_test=True) # 테스트 데이터셋 생성
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False) # 테스트 데이터 로더
model.eval() # 모델을 평가 모드로 전환
predictions = []
with torch.no_grad(): # 기울기 계산 비활성화 (추론 시에는 불필요)
for batch in test_loader:
batch = batch.to(device) # 배치를 GPU 또는 CPU로 이동
output = model(batch) # 모델로부터 예측값 생성
predictions.append(output.cpu().numpy()) # 예측 결과 저장
predictions_array = np.concatenate(predictions) # 예측 결과를 배열로 변환
# 8. 예측값을 원래 스케일로 역변환
price_column_index = test_data.columns.get_loc(test_dataset.price_column) # 가격 컬럼의 인덱스 찾기
predictions_reshaped = predictions_array.reshape(-1, 1)
price_scaler = MinMaxScaler() # 가격을 역변환할 스케일러
price_scaler.min_ = 품목별_scalers[품목명].min_[price_column_index] # 저장된 스케일러 값 복원
price_scaler.scale_ = 품목별_scalers[품목명].scale_[price_column_index]
predictions_original_scale = price_scaler.inverse_transform(predictions_reshaped) # 원래 스케일로 복원
if np.isnan(predictions_original_scale).any():
pbar_inner.set_postfix({"상태": "NaN"}) # NaN이 발생하면 상태 출력
else:
pbar_inner.set_postfix({"상태": "정상"}) # 정상일 경우 상태 출력
품목_predictions.extend(predictions_original_scale.flatten()) # 예측 결과 저장
품목별_predictions[품목명] = 품목_predictions # 품목별 예측 결과 저장
pbar_outer.update(1) # 진행 바 업데이트
return 품목별_predictions # 모든 품목에 대한 예측 결과 반환
전처리 및 데이터 로딩:
- process_all_items_with_full_conditions 함수를 사용해 훈련 데이터를 전처리하고, AgriculturePriceDataset을 통해 데이터를 PyTorch의 Dataset으로 변환한다.
- train_test_split을 사용해 훈련 데이터와 검증 데이터를 나눈 후, 이를 DataLoader로 감싸서 배치 단위로 학습에 사용할 수 있도록 준비한다.
train_data, scaler = process_all_items_with_full_conditions(...)
dataset = AgriculturePriceDataset(train_data)
train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_data, ...)
val_loader = DataLoader(val_data, ...)
모델 학습 및 검증:
- TimeSeriesTransformer 모델을 설정한 후, 학습 과정에서 사용될 손실 함수(HuberLoss), 옵티마이저(AdamW), 스케줄러(CosineAnnealingWarmRestarts), 그리고 혼합 정밀도 학습을 위한 GradScaler를 정의한다.
- 매 에포크마다 train_model 함수를 사용해 학습을 진행하고, evaluate_model 함수를 사용해 검증 데이터를 평가한다. 검증 손실이 가장 낮을 때 모델을 저장한다.
model = TimeSeriesTransformer(...)
criterion = nn.HuberLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.learning_rate, weight_decay=CFG.weight_decay)
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
scaler = GradScaler()
for epoch in range(CFG.epoch):
train_loss = train_model(model, train_loader, ...)
val_loss = evaluate_model(model, val_loader, ...)
if val_loss < best_val_loss:
torch.save(model.state_dict(), f'models/best_model_{품목명}.pth')
테스트 데이터 추론 및 예측 값 저장:
- 테스트 데이터 파일을 로드한 후, process_data로 전처리하여 테스트 데이터를 생성한다.
- model.eval()을 호출하여 모델을 평가 모드로 전환한 후, 기울기 계산을 비활성화(torch.no_grad())하고, 테스트 데이터에 대한 예측을 수행한다.
- 예측된 값을 원래 스케일로 복원한 후, 최종 예측 값을 저장한다.
model.eval()
with torch.no_grad():
for batch in test_loader:
batch = batch.to(device)
output = model(batch)
predictions.append(output.cpu().numpy())
predictions_array = np.concatenate(predictions)
predictions_original_scale = price_scaler.inverse_transform(predictions_reshaped)
품목_predictions.extend(predictions_original_scale.flatten())
결과 반환:
- 모든 품목에 대한 예측 결과를 품목별_predictions에 저장하고 반환한다.
return 품목별_predictions
1. Time2Vec Layer
- Time2Vec 레이어는 시간 정보를 벡터화하는 역할을 한다. 이는 Linear 변환과 주기적(sin) 변환으로 구성되어 있으며, 시간을 특성으로 입력받아 이를 다양한 주기로 변환하여 모델에 추가적인 정보를 제공한다.
2. Transformer Encoder Block
- TransformerBlock은 기본적인 트랜스포머의 인코더 블록을 정의한다. 멀티헤드 어텐션, 레이어 정규화, 피드포워드 네트워크를 포함하고 있으며, 입력 데이터의 패턴을 학습하여 중요한 정보를 추출한다.
3. Main Model (TimeSeriesTransformer)
- TimeSeriesTransformer 모델은 입력 데이터를 받아 Time2Vec 레이어로 처리한 후, 여러 개의 트랜스포머 블록을 거쳐 최종적으로 가격을 예측한다. 위치 인코딩(position encoding)을 사용하여 시계열 데이터의 순서를 모델이 인식할 수 있도록 한다.
4. Dataset Class (AgriculturePriceDataset)
- AgriculturePriceDataset 클래스는 데이터셋을 생성하는데 사용된다. 주어진 기간 동안의 농산물 가격 데이터를 윈도우 크기로 나누어 모델에 학습 데이터를 제공하며, 훈련용과 테스트용 데이터셋을 다르게 처리한다.
5. Training and Evaluation
- train_model 함수는 모델을 훈련시키는 함수이며, 혼합 정밀도 학습(mixed precision training)을 통해 성능을 최적화한다.
- evaluate_model 함수는 검증 데이터셋을 사용하여 모델의 성능을 평가한다.
6. Main Training Loop
- train_and_predict 함수는 품목 리스트에 대해 데이터를 전처리하고, 각 품목에 대해 모델을 학습시킨 후 테스트 데이터를 추론하여 예측값을 생성한다. 각 품목별로 최적의 모델을 저장하며, 이를 바탕으로 예측값을 저장한다.
7. Prediction and Submission
- 최종적으로, 예측된 값들을 sample_submission 파일에 저장하고 이를 제출 파일로 변환한다.