TimeSeriesTransformer Modeling

2024. 10. 7. 14:47클론 코딩

# Define Time2Vec layer
class Time2Vec(nn.Module):
    def __init__(self, input_dim):
        super(Time2Vec, self).__init__()
        self.linear = nn.Linear(input_dim, 1)
        self.periodic = nn.Linear(input_dim, input_dim-1)
    
    def forward(self, x):
        linear_out = self.linear(x)
        periodic_out = torch.sin(self.periodic(x))
        return torch.cat([linear_out, periodic_out], dim=-1)

# Define Transformer Encoder Block
class TransformerBlock(nn.Module):
    def __init__(self, input_dim, num_heads, dropout):
        super(TransformerBlock, self).__init__()
        self.attention = nn.MultiheadAttention(input_dim, num_heads, dropout=dropout)
        self.norm1 = nn.LayerNorm(input_dim)
        self.norm2 = nn.LayerNorm(input_dim)
        self.ff = nn.Sequential(
            nn.Linear(input_dim, 4 * input_dim),
            nn.GELU(),
            nn.Linear(4 * input_dim, input_dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        attended, _ = self.attention(x, x, x)
        x = self.norm1(attended + x)
        feedforward = self.ff(x)
        return self.norm2(feedforward + x)

# Define main model architecture
class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_heads, output_size, dropout):
        super(TimeSeriesTransformer, self).__init__()
        self.time2vec = Time2Vec(input_size)
        self.embedding = nn.Linear(input_size, hidden_size)
        self.position_encoding = self.generate_position_encoding(hidden_size, 10)
        self.dropout = nn.Dropout(dropout)
        
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(hidden_size, num_heads, dropout) 
            for _ in range(num_layers)
        ])
        
        self.output_layer = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, output_size)
        )

    def generate_position_encoding(self, hidden_size, max_len):
        pe = torch.zeros(max_len, hidden_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, x):
        b, s, f = x.shape
        x = self.time2vec(x)
        x = self.embedding(x)
        x = x + self.position_encoding[:, :s, :].to(x.device)
        x = self.dropout(x)
        
        for transformer in self.transformer_blocks:
            x = transformer(x)
        
        x = x.mean(dim=1)
        return self.output_layer(x)

# Define custom dataset class
class AgriculturePriceDataset(Dataset):
    def __init__(self, dataframe, window_size=9, prediction_length=3, is_test=False):
        self.data = dataframe
        self.window_size = window_size
        self.prediction_length = prediction_length
        self.is_test = is_test
        
        self.price_column = [col for col in self.data.columns if '평균가격(원)' in col and len(col.split('_')) == 1][0]
        self.numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()
        
        self.sequences = []
        if not self.is_test:
            for i in range(len(self.data) - self.window_size - self.prediction_length + 1):
                x = self.data[self.numeric_columns].iloc[i:i+self.window_size].values
                y = self.data[self.price_column].iloc[i+self.window_size:i+self.window_size+self.prediction_length].values
                self.sequences.append((x, y))
        else:
            self.sequences = [self.data[self.numeric_columns].values]
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        if not self.is_test:
            x, y = self.sequences[idx]
            return torch.FloatTensor(x), torch.FloatTensor(y)
        else:
            return torch.FloatTensor(self.sequences[idx])

# Training function with mixed precision training
def train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device):
    model.train()
    total_loss = 0
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        
        optimizer.zero_grad()
        
        with autocast():
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        scheduler.step()
        total_loss += loss.item()
    
    return total_loss / len(train_loader)

# Evaluation function
def evaluate_model(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            total_loss += loss.item()
    return total_loss / len(test_loader)

# Main training loop
def train_and_predict(품목_리스트):
    품목별_predictions = {}
    품목별_scalers = {}
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    pbar_outer = tqdm(품목_리스트, desc="품목 처리 중", position=0)
    for 품목명 in pbar_outer:
        pbar_outer.set_description(f"품목별 전처리 및 모델 학습 -> {품목명}")
        
        # Data preprocessing
        train_data, scaler = process_all_items_with_full_conditions("./train/train.csv", 
                                         "./train/meta/TRAIN_산지공판장_2018-2021.csv", 
                                         "./train/meta/TRAIN_전국도매_2018-2021.csv", 
                                         품목명)
        품목별_scalers[품목명] = scaler
        
        # Dataset and DataLoader setup
        dataset = AgriculturePriceDataset(train_data)
        train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)
        train_loader = DataLoader(train_data, CFG.batch_size, shuffle=True, num_workers=4)
        val_loader = DataLoader(val_data, CFG.batch_size, shuffle=False, num_workers=4)

        # Model setup
        input_size = len(dataset.numeric_columns)
        model = TimeSeriesTransformer(
            input_size=input_size,
            hidden_size=CFG.hidden_size,
            num_layers=CFG.num_layers,
            num_heads=CFG.num_heads,
            output_size=CFG.output_size,
            dropout=CFG.dropout
        ).to(device)

        # Training setup
        criterion = nn.HuberLoss()
        optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.learning_rate, weight_decay=CFG.weight_decay)
        scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
        scaler = GradScaler()

        # Training loop
        best_val_loss = float('inf')
        os.makedirs('models', exist_ok=True)

        for epoch in range(CFG.epoch):
            train_loss = train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device)
            val_loss = evaluate_model(model, val_loader, criterion, device)
            
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), f'models/best_model_{품목명}.pth')
            
            print(f'Epoch {epoch+1}/{CFG.epoch}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
            
        # Inference
        품목_predictions = []
        pbar_inner = tqdm(range(25), desc="테스트 파일 추론 중", position=1, leave=False)
        
        for i in pbar_inner:
            test_file = f"./test/TEST_{i:02d}.csv"
            산지공판장_file = f"./test/meta/TEST_산지공판장_{i:02d}.csv"
            전국도매_file = f"./test/meta/TEST_전국도매_{i:02d}.csv"
            
            test_data, _ = process_data(test_file, 산지공판장_file, 전국도매_file, 품목명, scaler=품목별_scalers[품목명])
            test_dataset = AgriculturePriceDataset(test_data, is_test=True)
            test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

            model.eval()
            predictions = []
            with torch.no_grad():
                for batch in test_loader:
                    batch = batch.to(device)
                    output = model(batch)
                    predictions.append(output.cpu().numpy())
            
            predictions_array = np.concatenate(predictions)
            
            # Inverse transform predictions
            price_column_index = test_data.columns.get_loc(test_dataset.price_column)
            predictions_reshaped = predictions_array.reshape(-1, 1)
            
            price_scaler = MinMaxScaler()
            price_scaler.min_ = 품목별_scalers[품목명].min_[price_column_index]
            price_scaler.scale_ = 품목별_scalers[품목명].scale_[price_column_index]
            predictions_original_scale = price_scaler.inverse_transform(predictions_reshaped)
            
            if np.isnan(predictions_original_scale).any():
                pbar_inner.set_postfix({"상태": "NaN"})
            else:
                pbar_inner.set_postfix({"상태": "정상"})
                품목_predictions.extend(predictions_original_scale.flatten())
                
        품목별_predictions[품목명] = 품목_predictions
        pbar_outer.update(1)
    
    return 품목별_predictions

# Run training and prediction
품목별_predictions = train_and_predict(품목_리스트)

# Prepare submission file
sample_submission = pd.read_csv('./sample_submission.csv')
for 품목명, predictions in 품목별_predictions.items():
    sample_submission[품목명] = predictions

# Save results
sample_submission.to_csv('./optimized_submission.csv', index=False)

 


Time2Vec Layer의 개념

Time2Vec는 기본적으로 시간 데이터를 벡터로 변환하여 입력에 추가하는 레이어이다. 여기서 시간은 단순한 선형적인 변화뿐만 아니라, 주기적 패턴(예: 주기적 이벤트, 계절적 변화)을 포착하는 것이 중요하다. 따라서 Time2Vec는 선형 변환과 **주기적 변환(사인 함수)**을 모두 사용하여 시간을 벡터로 표현한다.

class Time2Vec(nn.Module):
    def __init__(self, input_dim):
        super(Time2Vec, self).__init__()
        # 선형 변환을 위한 레이어: 시간 정보를 1차원으로 변환
        self.linear = nn.Linear(input_dim, 1)
        # 주기적 변환을 위한 레이어: 시간 정보를 input_dim - 1 크기의 벡터로 변환
        self.periodic = nn.Linear(input_dim, input_dim - 1)
    
    def forward(self, x):
        # 선형 변환 결과 (일차적으로 시간 값을 그대로 모델에 전달)
        linear_out = self.linear(x)
        # 주기적 변환 결과 (사인 함수를 통해 시간의 주기성을 반영)
        periodic_out = torch.sin(self.periodic(x))
        # 선형 변환 결과와 주기적 변환 결과를 결합하여 최종 Time2Vec 출력 생성
        return torch.cat([linear_out, periodic_out], dim=-1)

Time2Vec Layer의 역할

  • Time2Vec 레이어는 시간 정보를 더 풍부하게 표현하는 역할을 한다. 시계열 데이터의 시간 정보는 단순한 숫자(예: 시간, 일, 분 등)일 수 있지만, 이러한 시간 정보는 주기적 변화(예: 하루, 일주일, 계절 등)와 관련이 있을 수 있다. Time2Vec는 이러한 주기적 패턴을 모델이 학습할 수 있도록 한다.
  • 선형 변환: 시간을 단순히 선형적으로 변환하는 부분이다. 이 부분은 시간의 일차적인 흐름을 표현한다. 시간의 흐름을 그대로 모델에 전달하는 역할을 한다.
  • 주기적 변환: 사인 함수를 사용하여 주기성을 반영하는 부분이다. 예를 들어, 하루의 시간대나 계절적인 변화와 같은 주기적인 특성을 포착할 수 있다. 이 부분은 주기적인 패턴을 학습하는 데 유용하다.

forward 함수 -> 모델의 "순전파" 정의

TransformerBlock

TransformerBlock은 트랜스포머 아키텍처에서 중요한 요소 중 하나로, 기본적으로 입력 시퀀스의 관계를 학습하여 중요한 정보를 추출하는 역할을 한다. 이 블록은 멀티헤드 어텐션, 레이어 정규화, 그리고 피드포워드 네트워크로 구성되어 있다. 트랜스포머는 이러한 구조를 통해 입력 데이터의 패턴을 효과적으로 학습하고, 시퀀스 내의 장기적인 의존성을 파악할 수 있다.

class TransformerBlock(nn.Module):
    def __init__(self, input_dim, num_heads, dropout):
        super(TransformerBlock, self).__init__()
        # 멀티헤드 어텐션 모듈
        self.attention = nn.MultiheadAttention(input_dim, num_heads, dropout=dropout)
        # 어텐션 이후의 첫 번째 레이어 정규화
        self.norm1 = nn.LayerNorm(input_dim)
        # 피드포워드 네트워크 이후의 두 번째 레이어 정규화
        self.norm2 = nn.LayerNorm(input_dim)
        # 피드포워드 네트워크 (두 개의 선형 레이어와 활성화 함수 GELU로 구성)
        self.ff = nn.Sequential(
            nn.Linear(input_dim, 4 * input_dim),  # 첫 번째 선형 레이어
            nn.GELU(),  # 활성화 함수
            nn.Linear(4 * input_dim, input_dim),  # 두 번째 선형 레이어
            nn.Dropout(dropout)  # 드롭아웃으로 과적합 방지
        )

    def forward(self, x):
        # 1. 어텐션 레이어
        # Q(쿼리), K(키), V(밸류) 모두 같은 x를 사용하여 멀티헤드 어텐션 수행
        attended, _ = self.attention(x, x, x)
        # 2. 어텐션 결과와 입력을 더하고, 레이어 정규화 적용
        x = self.norm1(attended + x)
        # 3. 피드포워드 네트워크를 통과한 결과와 다시 입력을 더하고, 레이어 정규화 적용
        feedforward = self.ff(x)
        return self.norm2(feedforward + x)
self.attention = nn.MultiheadAttention(input_dim, num_heads, dropout=dropout)

 

  • 멀티헤드 어텐션은 트랜스포머 모델의 핵심 개념으로, 입력 시퀀스 내의 각 요소가 다른 요소와 어떻게 관계를 맺고 있는지 학습하는 메커니즘이다.
  • 쿼리(Q), 키(K), 밸류(V) 세 가지 요소를 기반으로 어텐션 스코어를 계산하며, Q, K, V는 모두 동일한 입력 데이터(x)에서 생성된다.
    • 멀티헤드 어텐션에서 **쿼리(Q)**는 "내가 다른 토큰들과 얼마나 관련이 있는가?"를 묻는 역할, **키(K)**는 "내가 그 질문과 얼마나 일치하는가?"에 대한 답변, **밸류(V)**는 그 토큰의 실제 정보를 나타낸다. 쿼리와 키를 사용해 어텐션 스코어를 계산하고, 이 스코어로 밸류를 가중합하여 최종 출력을 생성한다. 멀티헤드 어텐션은 이를 여러 번 병렬로 수행해 다양한 패턴을 학습할 수 있게 한다.
  • 여러 개의 어텐션 헤드를 사용하여, 데이터 내의 다양한 관계(패턴)를 병렬로 학습할 수 있다.
  • 각 헤드는 독립적으로 어텐션을 계산한 후, 이를 결합하여 최종 결과를 도출한다. 이는 다양한 각도에서 데이터 관계를 학습할 수 있도록 도와준다.
  • dropout을 사용하여 모델의 과적합을 방지한다.
self.norm1 = nn.LayerNorm(input_dim)
self.norm2 = nn.LayerNorm(input_dim)

 

 

  • 레이어 정규화는 각 층의 출력이 일정한 분포를 유지할 수 있도록 정규화하는 과정이다. 이는 학습을 더 안정적으로 만들어 주며, 네트워크가 더 잘 수렴하도록 돕는다.
  • self.norm1은 멀티헤드 어텐션 블록에서 나온 결과와 원본 입력을 더한 값에 대해 적용된다. 이는 Residual Connection(잔차 연결) 후에 바로 적용된다.
  • self.norm2는 피드포워드 네트워크를 통과한 결과에 적용된다.
self.ff = nn.Sequential(
    nn.Linear(input_dim, 4 * input_dim),  # 첫 번째 선형 레이어
    nn.GELU(),  # 활성화 함수
    nn.Linear(4 * input_dim, input_dim),  # 두 번째 선형 레이어
    nn.Dropout(dropout)  # 드롭아웃으로 과적합 방지
)
  • *피드포워드 네트워크(Feedforward Neural Network, FFN)**는 트랜스포머 모델에서 입력 데이터가 한 방향으로만 흐르며 레이어를 통과해 최종 출력을 계산하는 네트워크
  • 피드포워드 네트워크는 두 개의 선형 레이어로 구성되어 있으며, 중간에 GELU라는 활성화 함수가 사용된다.
  • 첫 번째 선형 레이어는 입력 차원(input_dim)을 4배로 확장한 후, GELU 활성화 함수를 거친다.
  • 두 번째 선형 레이어는 다시 원래의 차원(input_dim)으로 줄어들게 된다.
  • 이 과정은 입력 데이터에 대해 비선형 변환을 적용하여, 더 복잡한 패턴을 학습할 수 있도록 돕는다.
  • Dropout은 네트워크가 과적합되지 않도록 일부 뉴런을 무작위로 학습하지 않도록 하는 정규화 기법이다.

TimeSeriesTransformer

TimeSeriesTransformer는 시계열 데이터를 처리하여 미래의 가격을 예측하는 모델로, Time2Vec 레이어, 트랜스포머 블록들, 그리고 **위치 인코딩(Position Encoding)**을 사용하여 모델이 시간의 흐름과 시퀀스 내 패턴을 학습하도록 설계되어 있다. 이 모델은 주로 금융 데이터나 농산물 가격 예측과 같은 시계열 데이터에 유용하다.

 

class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_heads, output_size, dropout):
        super(TimeSeriesTransformer, self).__init__()
        # Time2Vec 레이어
        self.time2vec = Time2Vec(input_size)
        # 입력 데이터를 임베딩(차원 축소)하는 선형 레이어
        self.embedding = nn.Linear(input_size, hidden_size)
        # 위치 인코딩을 생성하는 함수 호출
        self.position_encoding = self.generate_position_encoding(hidden_size, 10)
        # 드롭아웃 레이어 (과적합 방지)
        self.dropout = nn.Dropout(dropout)
        
        # 트랜스포머 블록 여러 개 쌓기 (ModuleList를 사용하여 리스트로 생성)
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(hidden_size, num_heads, dropout) 
            for _ in range(num_layers)
        ])
        
        # 출력층: 예측을 위한 선형 레이어와 활성화 함수
        self.output_layer = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, output_size)
        )

    def generate_position_encoding(self, hidden_size, max_len):
        # 위치 인코딩 생성
        pe = torch.zeros(max_len, hidden_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, x):
        # 입력 데이터의 배치 크기(b), 시퀀스 길이(s), 특성 수(f)를 추출
        b, s, f = x.shape
        
        # 1. Time2Vec 레이어로 시간 정보를 추가한 벡터 생성
        x = self.time2vec(x)
        
        # 2. 입력 데이터를 임베딩 (차원 축소)
        x = self.embedding(x)
        
        # 3. 위치 인코딩을 더해줌으로써 시계열 데이터의 순서 정보를 추가
        x = x + self.position_encoding[:, :s, :].to(x.device)
        
        # 4. 드롭아웃을 적용하여 과적합 방지
        x = self.dropout(x)
        
        # 5. 트랜스포머 블록을 여러 번 거침
        for transformer in self.transformer_blocks:
            x = transformer(x)
        
        # 6. 시퀀스의 평균값을 계산하여 하나의 벡터로 변환 (차원 축소)
        x = x.mean(dim=1)
        
        # 7. 최종 출력 레이어를 통해 예측값 도출
        return self.output_layer(x)

주요 구성 요소 설명

  1. Time2Vec 레이어
    • 시계열 데이터에서 시간을 벡터로 변환하는 역할을 한다.
    • 선형 변환과 주기적 변환(사인 함수)을 사용하여 시간 정보를 더 효과적으로 모델에 반영한다.
self.time2vec = Time2Vec(input_size)

 

 

2. Embedding Layer (임베딩 레이어)

연속적인 벡터로 변환

  • 입력 데이터를 차원 축소하여 모델이 더 쉽게 처리할 수 있도록 한다.
  • 입력의 크기인 input_size에서 모델의 내부 크기인 hidden_size로 변환하는 역할을 한다.
self.embedding = nn.Linear(input_size, hidden_size)

 

3.Position Encoding (위치 인코딩)

  • 시계열 데이터에서 각 데이터 포인트시간적으로 어디에 위치해 있는지모델이 인식할 수 있도록 돕는 기법이다.
  • generate_position_encoding 함수는 **사인(sin)**과 코사인(cos) 함수를 사용하여 위치 인코딩을 계산한다.
def generate_position_encoding(self, hidden_size, max_len):
    pe = torch.zeros(max_len, hidden_size)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    return pe.unsqueeze(0)

 

 

여기서 position은 시퀀스 내의 각 요소의 위치이며, 이를 사인코사인으로 변환해 위치 정보를 벡터화한다. 이 정보를 모델에 더해줌으로써 모델이 시퀀스의 순서 정보를 학습할 수 있다.

  • 이렇게 계산된 위치 인코딩은 입력 데이터 x에 더해진다:
div_term = torch.exp(torch.arange(0, hidden_size, 2).float() * (-np.log(10000.0) / hidden_size))

*위치 인코딩(position encoding)**을 생성하는 과정에서, 각 위치에 고유한 값을 부여하기 위해 사용되는 수식이다.

x = x + self.position_encoding[:, :s, :].to(x.device)

 

 

4. Transformer Blocks (트랜스포머 블록)

  • 여러 개의 TransformerBlock을 쌓아서, 시퀀스 내 패턴을 학습한다. 각 블록은 멀티헤드 어텐션 피드포워드 네트워크로 구성되어 있으며, 입력 데이터를 반복적으로 처리한다.
  • ModuleList를 사용하여 여러 개의 트랜스포머 블록을 정의한다.
self.transformer_blocks = nn.ModuleList([
    TransformerBlock(hidden_size, num_heads, dropout) 
    for _ in range(num_layers)
])

 

forward 함수에서는 각 트랜스포머 블록을 반복적으로 적용하여 입력 데이터를 처리한다:

for transformer in self.transformer_blocks:
    x = transformer(x)

 

5. Global Average Pooling (시퀀스의 평균 계산)

  • 트랜스포머 블록을 거친 후, 각 시퀀스의 요소들에 대해 평균을 계산하여 하나의 벡터로 변환한다. 이는 시퀀스의 중요한 정보를 하나의 벡터에 모으는 역할을 한다.
x = x.mean(dim=1)

 

6. Output Layer (출력 레이어)

  • 최종 예측을 위한 출력층으로, ReLU 활성화 함수와 드롭아웃을 거친 후, 최종적으로 output_size 크기의 예측 값을 도출한다.
self.output_layer = nn.Sequential(
    nn.Linear(hidden_size, hidden_size // 2),
    nn.ReLU(),
    nn.Dropout(dropout),
    nn.Linear(hidden_size // 2, output_size)
)

 

AgriculturePriceDataset (데이터 분할)

AgriculturePriceDataset 클래스는 농산물 가격 예측을 위한 데이터를 생성하고, 훈련 데이터테스트 데이터다르게 처리하여 모델에 입력할 수 있도록 하는 사용자 정의 데이터셋 클래스이다. 주어진 시계열 데이터를 일정한 크기(윈도우 크기)로 나누어 모델이 학습할 수 있도록 준비하며, 예측하려는 목표 값(타겟 값)도 함께 제공한다.

class AgriculturePriceDataset(Dataset):
    def __init__(self, dataframe, window_size=9, prediction_length=3, is_test=False):
        # 데이터프레임과 윈도우 크기, 예측 길이, 테스트 모드 여부를 받아 초기화
        self.data = dataframe
        self.window_size = window_size
        self.prediction_length = prediction_length
        self.is_test = is_test
        
        # 가격 정보가 포함된 컬럼 찾기
        self.price_column = [col for col in self.data.columns if '평균가격(원)' in col and len(col.split('_')) == 1][0]
        # 숫자형 데이터를 포함한 모든 열을 리스트로 저장
        self.numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()
        
        # 데이터셋 생성 (훈련용 또는 테스트용 데이터에 따라 다르게 처리)
        self.sequences = []
        if not self.is_test:
            # 훈련 데이터일 경우
            for i in range(len(self.data) - self.window_size - self.prediction_length + 1):
                x = self.data[self.numeric_columns].iloc[i:i+self.window_size].values  # 입력 데이터 (x)
                y = self.data[self.price_column].iloc[i+self.window_size:i+self.window_size+self.prediction_length].values  # 예측할 타겟 값 (y)
                self.sequences.append((x, y))
        else:
            # 테스트 데이터일 경우 전체 데이터를 시퀀스로 처리
            self.sequences = [self.data[self.numeric_columns].values]
    
    def __len__(self):
        # 데이터셋의 길이를 반환 (훈련 데이터의 경우 시퀀스의 개수, 테스트 데이터는 하나의 시퀀스)
        return len(self.sequences)
    
    def __getitem__(self, idx):
        # 데이터셋에서 인덱스를 기준으로 샘플을 반환
        if not self.is_test:
            x, y = self.sequences[idx]
            return torch.FloatTensor(x), torch.FloatTensor(y)
        else:
            return torch.FloatTensor(self.sequences[idx])

 

주요 구성 요소 설명

1. __init__ 메서드 (초기화)

 def __init__(self, dataframe, window_size=9, prediction_length=3, is_test=False):
        # 데이터프레임과 윈도우 크기, 예측 길이, 테스트 모드 여부를 받아 초기화
        self.data = dataframe
        self.window_size = window_size
        self.prediction_length = prediction_length
        self.is_test = is_test

 

  • dataframe: 학습 또는 테스트에 사용할 데이터가 포함된 pandas 데이터프레임이다. 이 데이터프레임에는 다양한 특성과 농산물의 가격 데이터가 포함되어 있다.
  • window_size: 입력 데이터의 시퀀스 길이를 나타낸다. 예를 들어, window_size=9이면 과거 9일치 데이터를 입력으로 사용하여 이후 며칠간의 가격을 예측한다.
  • prediction_length: 예측할 타겟 값(미래의 가격)의 길이를 의미한다. 예를 들어, prediction_length=3이면 3일 후의 가격을 예측한다.
  • is_test: 테스트 데이터인지 여부를 나타내는 플래그다. 테스트 데이터일 경우, 타겟 값이 없기 때문에 시퀀스만 생성한다.
self.price_column = [col for col in self.data.columns if '평균가격(원)' in col and len(col.split('_')) == 1][0]
self.numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()

 

 

  • price_column: 데이터프레임에서 평균 가격 정보를 포함하는 컬럼을 자동으로 찾아 설정한다. 이 컬럼이 타겟 값이 된다.
  • numeric_columns: 데이터프레임에서 숫자형 데이터를 포함하는 모든 열을 리스트로 저장한다. 이 열들이 모델의 입력으로 사용된다.

 

 # 데이터셋 생성 (훈련용 또는 테스트용 데이터에 따라 다르게 처리)
        self.sequences = []
        if not self.is_test:
            # 훈련 데이터일 경우
            for i in range(len(self.data) - self.window_size - self.prediction_length + 1):
                x = self.data[self.numeric_columns].iloc[i:i+self.window_size].values  # 입력 데이터 (x)
                y = self.data[self.price_column].iloc[i+self.window_size:i+self.window_size+self.prediction_length].values  # 예측할 타겟 값 (y)
                self.sequences.append((x, y))

 

  • 입력 데이터(x): window_size 크기만큼 데이터를 슬라이딩 윈도우 방식으로 자른다. 각 시퀀스는 self.numeric_columns에 포함된 숫자형 데이터를 사용하여 입력으로 처리된다.
  • 타겟 값(y): prediction_length 크기만큼 이후의 가격 데이터를 타겟 값으로 설정한다.
  • 이 과정을 통해 훈련 데이터는 입력과 타겟 값이 쌍으로 저장된 시퀀스로 구성된다.
else:
            # 테스트 데이터일 경우 전체 데이터를 시퀀스로 처리
            self.sequences = [self.data[self.numeric_columns].values]

 

 

테스트 데이터일 경우 타겟 값은 필요하지 않기 때문에, 전체 데이터를 시퀀스로 저장한다. 여기서는 테스트 데이터를 예측하기 위한 입력만 준비한다.

 

def __len__(self):
        # 데이터셋의 길이를 반환 (훈련 데이터의 경우 시퀀스의 개수, 테스트 데이터는 하나의 시퀀스)
        return len(self.sequences)

 

데이터셋의 길이를 반환한다. 훈련 데이터일 경우, 생성된 시퀀스의 개수를 반환하며, 테스트 데이터는 하나의 시퀀스만 존재할 수 있다.

 

def __getitem__(self, idx):
    if not self.is_test:
        x, y = self.sequences[idx]
        return torch.FloatTensor(x), torch.FloatTensor(y)
    else:
        return torch.FloatTensor(self.sequences[idx])

 

  • 데이터셋에서 주어진 인덱스 idx에 해당하는 데이터를 반환한다.
    • 훈련 데이터: 입력 시퀀스 x와 타겟 값 y를 반환한다.
    • 테스트 데이터: 타겟 값 없이 입력 시퀀스 x만 반환한다.
  • 반환되는 값은 torch.FloatTensor로 변환되어 PyTorch 모델에서 사용할 수 있는 형태로 제공된다.

요약

  • **AgriculturePriceDataset**은 주어진 데이터에서 일정한 기간 동안의 입력과 그 이후의 타겟 값을 쌍으로 만들어 모델 학습에 필요한 데이터를 제공한다.
  • 훈련 데이터와 테스트 데이터를 다르게 처리하며, 훈련 데이터의 경우 입력과 타겟을, 테스트 데이터의 경우 입력만을 반환한다.
  • 이를 통해 시계열 데이터를 모델에 효율적으로 제공할 수 있으며, 특히 농산물 가격 예측과 같은 시계열 예측 문제에서 사용된다.

 

5. Training and Evaluation

train_model 함수는 모델을 훈련시키고, **혼합 정밀도 학습(Mixed Precision Training)**을 사용하여 메모리 사용량을 최적화하고 훈련 속도를 향상시킨다.
evaluate_model 함수는 검증 데이터셋을 사용하여 모델의 성능을 평가하는 역할을 한다.

train_model 함수

이 함수는 모델을 학습시키기 위해 배치 단위로 데이터를 받아, 모델의 예측 결과와 실제 값의 차이를 기반으로 손실을 계산한 후, 이를 바탕으로 모델의 가중치를 업데이트하는 과정이다.

# Training function with mixed precision training
def train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device):
    model.train()  # 모델을 학습 모드로 설정
    total_loss = 0  # 총 손실 초기화

    # 미니배치 학습
    for batch_x, batch_y in train_loader:
        # 배치를 GPU 또는 CPU로 이동
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        
        # 옵티마이저의 기울기 초기화
        optimizer.zero_grad()
        
        # 혼합 정밀도 학습을 위한 autocast 사용
        with autocast():
            # 모델을 통해 예측값 생성
            outputs = model(batch_x)
            # 예측값과 실제 값의 손실 계산
            loss = criterion(outputs, batch_y)
        
        # 손실을 역전파 (backpropagation) 하여 모델의 가중치 업데이트
        scaler.scale(loss).backward()  # 손실 값에 대한 스케일링 적용
        scaler.step(optimizer)  # 스케일링된 손실 값으로 옵티마이저를 사용해 가중치 업데이트
        scaler.update()  # 스케일러 업데이트
        
        # 스케줄러를 사용해 학습률 조정
        scheduler.step()
        
        # 배치의 손실을 총 손실에 더함
        total_loss += loss.item()
    
    # 평균 손실 반환
    return total_loss / len(train_loader)

 

설명

  1. model.train():
    모델을 학습 모드로 전환한다. 이 설정은 드롭아웃과 배치 정규화 등의 레이어가 학습 모드에서 동작하도록 한다.
  2. 데이터 로드 및 장치로 이동:
    미니배치 단위로 데이터를 train_loader에서 받아, device(GPU 또는 CPU)로 이동시킨다.
batch_x, batch_y = batch_x.to(device), batch_y.to(device)

 

3.혼합 정밀도 학습 (autocast):
**autocast()**는 혼합 정밀도 학습을 가능하게 하는 컨텍스트 매니저이다. 이를 통해 일부 연산이 FP16(반 정밀도)로 처리되어 메모리 효율성과 학습 속도가 향상된다.

with autocast():
    outputs = model(batch_x)  # 모델 예측
    loss = criterion(outputs, batch_y)  # 손실 계산

 

4.손실 역전파 및 옵티마이저 업데이트:
scaler.scale(loss)손실 값을 스케일링하여, 낮은 정밀도의 연산에서 발생할 수 있는 수치 불안정을 방지한다. 이후 backward()를 통해 손실을 역전파하고, 옵티마이저로 가중치를 업데이트한다.

 

손실역전파 : 딥러닝 모델에서 손실 함수를 기준으로 가중치를 업데이트하기 위해 사용하는 알고리즘

scaler.scale(loss).backward()  # 손실 역전파
scaler.step(optimizer)  # 옵티마이저 업데이트
scaler.update()  # 스케일러 업데이트

 

학습률 스케줄링:
스케줄러학습률을 동적으로 조정하는 역할을 한다. 여기서는 CosineAnnealingWarmRestarts 같은 스케줄러를 사용해 학습률을 조정하며, 매 미니배치마다 학습률을 업데이트한다.

 

스케줄러 : 딥러닝 모델을 학습할 때 **학습률(learning rate)**을 동적으로 조절하는 방법을 말한다. 학습률은 가중치 업데이트의 크기를 결정하는 중요한 하이퍼파라미터로, 학습이 진행됨에 따라 학습률을 변화시키면 더 좋은 성능을 얻을 수 있다. 스케줄러는 학습 과정 중에 학습률을 어떻게 변화시킬지를 제어하는 역할

scheduler.step()

 

손실 계산:
각 미니배치의 손실을 누적하여, 총 손실을 계산하고 평균 손실을 반환한다.

return total_loss / len(train_loader)

 

evaluate_model 함수

evaluate_model 함수는 학습된 모델이 검증 데이터셋에서 어떻게 성능을 발휘하는지 평가하는 함수다. 이 함수는 모델을 평가 모드로 전환하고, 검증 데이터셋에서 예측을 수행한 뒤 손실 값을 계산한다.

# Evaluation function
def evaluate_model(model, test_loader, criterion, device):
    model.eval()  # 모델을 평가 모드로 설정
    total_loss = 0  # 총 손실 초기화
    
    with torch.no_grad():  # 평가에서는 기울기 계산을 하지 않음
        for batch_x, batch_y in test_loader:
            # 배치를 GPU 또는 CPU로 이동
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            
            # 모델을 통해 예측값 생성
            outputs = model(batch_x)
            # 예측값과 실제 값의 손실 계산
            loss = criterion(outputs, batch_y)
            # 배치의 손실을 총 손실에 더함
            total_loss += loss.item()
    
    # 평균 손실 반환
    return total_loss / len(test_loader)

 

설명

  1. model.eval():
    모델을 평가 모드로 전환한다. 이 모드는 드롭아웃과 배치 정규화 레이어가 평가 모드에서 동작하도록 하며, 학습 시에만 필요한 기울기 계산을 하지 않는다.
  2. 기울기 비활성화 (torch.no_grad()):
    평가 과정에서는 기울기 계산이 필요 없으므로 **torch.no_grad()**를 사용하여 메모리 사용량을 줄이고, 계산 속도를 높인다.
with torch.no_grad():

 

데이터 로드 및 장치로 이동:
검증 데이터(test_loader)에서 배치를 로드하여, device로 이동시킨다.

batch_x, batch_y = batch_x.to(device), batch_y.to(device)

 

모델 예측 및 손실 계산:
모델의 예측 값을 생성한 후, 실제 값과의 손실을 계산하여 누적한다.

outputs = model(batch_x)  # 예측값 생성
loss = criterion(outputs, batch_y)  # 손실 계산

 

평균 손실 계산:
각 미니배치의 손실을 모두 더한 후, 검증 데이터 전체에 대한 평균 손실을 반환한다.

return total_loss / len(test_loader)

 

train_and_predict 함수

train_and_predict 함수는 농산물 품목 리스트에 대해 데이터를 전처리한 후, 각 품목별로 모델을 학습시키고, 테스트 데이터를 기반으로 가격을 예측하는 전체적인 학습 및 예측 파이프라인을 정의한 함수이다. 각 품목에 대해 모델을 학습시키며, 최적의 모델을 저장하고, 이를 바탕으로 예측 값을 생성한다.

# Main training loop
def train_and_predict(품목_리스트):
    품목별_predictions = {}  # 각 품목에 대한 예측 결과 저장
    품목별_scalers = {}  # 각 품목별로 사용된 스케일러 저장
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # GPU 사용 가능 시 CUDA 사용

    pbar_outer = tqdm(품목_리스트, desc="품목 처리 중", position=0)
    
    # 품목 리스트에 대해 반복
    for 품목명 in pbar_outer:
        pbar_outer.set_description(f"품목별 전처리 및 모델 학습 -> {품목명}")
        
        # 1. 데이터 전처리 (품목별로 데이터 로드 및 스케일링 처리)
        train_data, scaler = process_all_items_with_full_conditions(
            "./train/train.csv", 
            "./train/meta/TRAIN_산지공판장_2018-2021.csv", 
            "./train/meta/TRAIN_전국도매_2018-2021.csv", 
            품목명
        )
        품목별_scalers[품목명] = scaler  # 각 품목별로 스케일러 저장
        
        # 2. Dataset 및 DataLoader 설정
        dataset = AgriculturePriceDataset(train_data)  # 전처리된 데이터를 Dataset으로 변환
        train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)  # 학습/검증 데이터 분리
        train_loader = DataLoader(train_data, CFG.batch_size, shuffle=True, num_workers=4)  # 학습 데이터 로더
        val_loader = DataLoader(val_data, CFG.batch_size, shuffle=False, num_workers=4)  # 검증 데이터 로더

        # 3. 모델 설정
        input_size = len(dataset.numeric_columns)  # 입력 데이터의 크기
        model = TimeSeriesTransformer(
            input_size=input_size,
            hidden_size=CFG.hidden_size,
            num_layers=CFG.num_layers,
            num_heads=CFG.num_heads,
            output_size=CFG.output_size,
            dropout=CFG.dropout
        ).to(device)  # 모델을 GPU 또는 CPU로 이동

        # 4. 손실 함수, 옵티마이저, 스케줄러 설정
        criterion = nn.HuberLoss()  # 손실 함수: HuberLoss
        optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.learning_rate, weight_decay=CFG.weight_decay)  # AdamW 옵티마이저
        scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)  # 학습률 스케줄러: CosineAnnealingWarmRestarts
        scaler = GradScaler()  # 혼합 정밀도 학습을 위한 GradScaler

        # 5. 학습 루프
        best_val_loss = float('inf')  # 초기 최저 검증 손실 설정
        os.makedirs('models', exist_ok=True)  # 모델 저장을 위한 디렉토리 생성

        for epoch in range(CFG.epoch):  # 에포크 반복
            train_loss = train_model(model, train_loader, criterion, optimizer, scheduler, scaler, device)  # 학습
            val_loss = evaluate_model(model, val_loader, criterion, device)  # 검증
            
            # 최적의 검증 손실을 기록하고, 해당 모델 저장
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), f'models/best_model_{품목명}.pth')  # 최적의 모델 저장
            
            print(f'Epoch {epoch+1}/{CFG.epoch}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        
        # 6. 테스트 데이터에 대한 추론 (Inference)
        품목_predictions = []  # 각 품목에 대한 예측 결과 저장
        pbar_inner = tqdm(range(25), desc="테스트 파일 추론 중", position=1, leave=False)
        
        for i in pbar_inner:
            # 테스트 데이터 파일 로드
            test_file = f"./test/TEST_{i:02d}.csv"
            산지공판장_file = f"./test/meta/TEST_산지공판장_{i:02d}.csv"
            전국도매_file = f"./test/meta/TEST_전국도매_{i:02d}.csv"
            
            # 7. 테스트 데이터 전처리
            test_data, _ = process_data(test_file, 산지공판장_file, 전국도매_file, 품목명, scaler=품목별_scalers[품목명])
            test_dataset = AgriculturePriceDataset(test_data, is_test=True)  # 테스트 데이터셋 생성
            test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)  # 테스트 데이터 로더

            model.eval()  # 모델을 평가 모드로 전환
            predictions = []
            with torch.no_grad():  # 기울기 계산 비활성화 (추론 시에는 불필요)
                for batch in test_loader:
                    batch = batch.to(device)  # 배치를 GPU 또는 CPU로 이동
                    output = model(batch)  # 모델로부터 예측값 생성
                    predictions.append(output.cpu().numpy())  # 예측 결과 저장
            
            predictions_array = np.concatenate(predictions)  # 예측 결과를 배열로 변환
            
            # 8. 예측값을 원래 스케일로 역변환
            price_column_index = test_data.columns.get_loc(test_dataset.price_column)  # 가격 컬럼의 인덱스 찾기
            predictions_reshaped = predictions_array.reshape(-1, 1)
            
            price_scaler = MinMaxScaler()  # 가격을 역변환할 스케일러
            price_scaler.min_ = 품목별_scalers[품목명].min_[price_column_index]  # 저장된 스케일러 값 복원
            price_scaler.scale_ = 품목별_scalers[품목명].scale_[price_column_index]
            predictions_original_scale = price_scaler.inverse_transform(predictions_reshaped)  # 원래 스케일로 복원
            
            if np.isnan(predictions_original_scale).any():
                pbar_inner.set_postfix({"상태": "NaN"})  # NaN이 발생하면 상태 출력
            else:
                pbar_inner.set_postfix({"상태": "정상"})  # 정상일 경우 상태 출력
                품목_predictions.extend(predictions_original_scale.flatten())  # 예측 결과 저장
                
        품목별_predictions[품목명] = 품목_predictions  # 품목별 예측 결과 저장
        pbar_outer.update(1)  # 진행 바 업데이트
    
    return 품목별_predictions  # 모든 품목에 대한 예측 결과 반환

 

전처리 및 데이터 로딩:

  • process_all_items_with_full_conditions 함수를 사용해 훈련 데이터를 전처리하고, AgriculturePriceDataset을 통해 데이터를 PyTorch의 Dataset으로 변환한다.
  • train_test_split을 사용해 훈련 데이터와 검증 데이터를 나눈 후, 이를 DataLoader로 감싸서 배치 단위로 학습에 사용할 수 있도록 준비한다.
train_data, scaler = process_all_items_with_full_conditions(...)
dataset = AgriculturePriceDataset(train_data)
train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_data, ...)
val_loader = DataLoader(val_data, ...)

 

모델 학습 및 검증:

  • TimeSeriesTransformer 모델을 설정한 후, 학습 과정에서 사용될 손실 함수(HuberLoss), 옵티마이저(AdamW), 스케줄러(CosineAnnealingWarmRestarts), 그리고 혼합 정밀도 학습을 위한 GradScaler를 정의한다.
  • 매 에포크마다 train_model 함수를 사용해 학습을 진행하고, evaluate_model 함수를 사용해 검증 데이터를 평가한다. 검증 손실이 가장 낮을 때 모델을 저장한다.
model = TimeSeriesTransformer(...)
criterion = nn.HuberLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.learning_rate, weight_decay=CFG.weight_decay)
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
scaler = GradScaler()

for epoch in range(CFG.epoch):
    train_loss = train_model(model, train_loader, ...)
    val_loss = evaluate_model(model, val_loader, ...)
    
    if val_loss < best_val_loss:
        torch.save(model.state_dict(), f'models/best_model_{품목명}.pth')

 

테스트 데이터 추론 및 예측 값 저장:

  • 테스트 데이터 파일을 로드한 후, process_data로 전처리하여 테스트 데이터를 생성한다.
  • model.eval()을 호출하여 모델을 평가 모드로 전환한 후, 기울기 계산을 비활성화(torch.no_grad())하고, 테스트 데이터에 대한 예측을 수행한다.
  • 예측된 값을 원래 스케일로 복원한 후, 최종 예측 값을 저장한다.
model.eval()
with torch.no_grad():
    for batch in test_loader:
        batch = batch.to(device)
        output = model(batch)
        predictions.append(output.cpu().numpy())

predictions_array = np.concatenate(predictions)
predictions_original_scale = price_scaler.inverse_transform(predictions_reshaped)
품목_predictions.extend(predictions_original_scale.flatten())

 

결과 반환:

  • 모든 품목에 대한 예측 결과를 품목별_predictions에 저장하고 반환한다.
return 품목별_predictions

 


1. Time2Vec Layer

  • Time2Vec 레이어는 시간 정보를 벡터화하는 역할을 한다. 이는 Linear 변환과 주기적(sin) 변환으로 구성되어 있으며, 시간을 특성으로 입력받아 이를 다양한 주기로 변환하여 모델에 추가적인 정보를 제공한다.

2. Transformer Encoder Block

  • TransformerBlock은 기본적인 트랜스포머의 인코더 블록을 정의한다. 멀티헤드 어텐션, 레이어 정규화, 피드포워드 네트워크를 포함하고 있으며, 입력 데이터의 패턴을 학습하여 중요한 정보를 추출한다.

3. Main Model (TimeSeriesTransformer)

  • TimeSeriesTransformer 모델은 입력 데이터를 받아 Time2Vec 레이어로 처리한 후, 여러 개의 트랜스포머 블록을 거쳐 최종적으로 가격을 예측한다. 위치 인코딩(position encoding)을 사용하여 시계열 데이터의 순서를 모델이 인식할 수 있도록 한다.

4. Dataset Class (AgriculturePriceDataset)

  • AgriculturePriceDataset 클래스는 데이터셋을 생성하는데 사용된다. 주어진 기간 동안의 농산물 가격 데이터를 윈도우 크기로 나누어 모델에 학습 데이터를 제공하며, 훈련용과 테스트용 데이터셋을 다르게 처리한다.

5. Training and Evaluation

  • train_model 함수는 모델을 훈련시키는 함수이며, 혼합 정밀도 학습(mixed precision training)을 통해 성능을 최적화한다.
  • evaluate_model 함수는 검증 데이터셋을 사용하여 모델의 성능을 평가한다.

6. Main Training Loop

  • train_and_predict 함수는 품목 리스트에 대해 데이터를 전처리하고, 각 품목에 대해 모델을 학습시킨 후 테스트 데이터를 추론하여 예측값을 생성한다. 각 품목별로 최적의 모델을 저장하며, 이를 바탕으로 예측값을 저장한다.

7. Prediction and Submission

  • 최종적으로, 예측된 값들을 sample_submission 파일에 저장하고 이를 제출 파일로 변환한다.