|
|
import torch |
|
|
import torch.nn as nn |
|
|
import numpy as np |
|
|
import pickle |
|
|
import json |
|
|
import os |
|
|
|
|
|
class ImprovedCashFlowLSTM(nn.Module): |
|
|
def __init__(self, input_size, hidden_size=128, num_layers=2, forecast_horizon=13, dropout=0.2): |
|
|
super(ImprovedCashFlowLSTM, self).__init__() |
|
|
self.hidden_size = hidden_size |
|
|
self.num_layers = num_layers |
|
|
self.forecast_horizon = forecast_horizon |
|
|
self.lstm = nn.LSTM( |
|
|
input_size, |
|
|
hidden_size, |
|
|
num_layers, |
|
|
dropout=dropout if num_layers > 1 else 0, |
|
|
batch_first=True |
|
|
) |
|
|
self.output_layers = nn.Sequential( |
|
|
nn.Linear(hidden_size, hidden_size), |
|
|
nn.ReLU(), |
|
|
nn.Dropout(dropout), |
|
|
nn.Linear(hidden_size, forecast_horizon) |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
lstm_out, (hidden, cell) = self.lstm(x) |
|
|
last_hidden = lstm_out[:, -1, :] |
|
|
forecast = self.output_layers(last_hidden) |
|
|
return forecast |
|
|
|
|
|
def load_model_and_artifacts( |
|
|
model_path="new_best_improved_model.pth", |
|
|
scaler_path="scaler.pkl", |
|
|
feature_names_path="feature_names.json", |
|
|
config_path="model_config.json" |
|
|
): |
|
|
if not all(os.path.exists(path) for path in [model_path, scaler_path, feature_names_path, config_path]): |
|
|
missing = [path for path in [model_path, scaler_path, feature_names_path, config_path] if not os.path.exists(path)] |
|
|
raise FileNotFoundError(f"Missing files: {missing}") |
|
|
|
|
|
with open(config_path, "r") as f: |
|
|
config = json.load(f) |
|
|
|
|
|
with open(scaler_path, "rb") as f: |
|
|
scaler = pickle.load(f) |
|
|
|
|
|
with open(feature_names_path, "r") as f: |
|
|
feature_names = json.load(f) |
|
|
|
|
|
input_size = config["input_size"] |
|
|
model = ImprovedCashFlowLSTM( |
|
|
input_size=input_size, |
|
|
hidden_size=config["hidden_size"], |
|
|
num_layers=config["num_layers"], |
|
|
forecast_horizon=config["forecast_horizon"], |
|
|
dropout=config["dropout"] |
|
|
) |
|
|
model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"))) |
|
|
model.eval() |
|
|
return model, scaler, feature_names, config |
|
|
|
|
|
def predict(model, scaler, sequences): |
|
|
device = torch.device("cpu") |
|
|
model.to(device) |
|
|
|
|
|
|
|
|
if len(sequences.shape) != 3 or sequences.shape[1] != 21 or sequences.shape[2] != model.lstm.input_size: |
|
|
raise ValueError(f"Expected input shape (batch_size, 21, {model.lstm.input_size}), got {sequences.shape}") |
|
|
|
|
|
|
|
|
sequences = torch.tensor(sequences, dtype=torch.float32).to(device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
predictions = model(sequences).cpu().numpy() |
|
|
|
|
|
|
|
|
dummy = np.zeros((predictions.shape[0] * predictions.shape[1], scaler.n_features_in_)) |
|
|
dummy[:, 0] = predictions.flatten() |
|
|
rescaled = scaler.inverse_transform(dummy)[:, 0].reshape(predictions.shape) |
|
|
|
|
|
|
|
|
rescaled = np.maximum(rescaled, 0) |
|
|
rescaled = np.clip(rescaled, 3000, 19372) |
|
|
|
|
|
|
|
|
uncertainties = np.std(rescaled, axis=1, keepdims=True) + 100 |
|
|
uncertainties = np.clip(uncertainties, 100, 500) |
|
|
|
|
|
return rescaled, uncertainties |