import torch import torch.nn as nn import numpy as np import pickle import json import os class ImprovedCashFlowLSTM(nn.Module): def __init__(self, input_size, hidden_size=128, num_layers=2, forecast_horizon=13, dropout=0.2): super(ImprovedCashFlowLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.forecast_horizon = forecast_horizon self.lstm = nn.LSTM( input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True ) self.output_layers = nn.Sequential( nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_size, forecast_horizon) ) def forward(self, x): lstm_out, (hidden, cell) = self.lstm(x) last_hidden = lstm_out[:, -1, :] forecast = self.output_layers(last_hidden) return forecast def load_model_and_artifacts( model_path="new_best_improved_model.pth", scaler_path="scaler.pkl", feature_names_path="feature_names.json", config_path="model_config.json" ): if not all(os.path.exists(path) for path in [model_path, scaler_path, feature_names_path, config_path]): missing = [path for path in [model_path, scaler_path, feature_names_path, config_path] if not os.path.exists(path)] raise FileNotFoundError(f"Missing files: {missing}") with open(config_path, "r") as f: config = json.load(f) with open(scaler_path, "rb") as f: scaler = pickle.load(f) with open(feature_names_path, "r") as f: feature_names = json.load(f) input_size = config["input_size"] model = ImprovedCashFlowLSTM( input_size=input_size, hidden_size=config["hidden_size"], num_layers=config["num_layers"], forecast_horizon=config["forecast_horizon"], dropout=config["dropout"] ) model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"))) model.eval() return model, scaler, feature_names, config def predict(model, scaler, sequences): device = torch.device("cpu") model.to(device) # Validate input shape: (batch_size, sequence_length=21, n_features=20) if len(sequences.shape) != 3 or sequences.shape[1] != 21 or sequences.shape[2] != model.lstm.input_size: raise ValueError(f"Expected input shape (batch_size, 21, {model.lstm.input_size}), got {sequences.shape}") # Convert to tensor sequences = torch.tensor(sequences, dtype=torch.float32).to(device) # Generate predictions with torch.no_grad(): predictions = model(sequences).cpu().numpy() # Shape: (batch_size, 13) # Inverse transform predictions (sales is first feature) dummy = np.zeros((predictions.shape[0] * predictions.shape[1], scaler.n_features_in_)) dummy[:, 0] = predictions.flatten() rescaled = scaler.inverse_transform(dummy)[:, 0].reshape(predictions.shape) # Ensure non-negative predictions and clip to training range rescaled = np.maximum(rescaled, 0) rescaled = np.clip(rescaled, 3000, 19372) # Training sales range: $3069–19372 # Estimate uncertainty (simplified: std of predictions + base uncertainty) uncertainties = np.std(rescaled, axis=1, keepdims=True) + 100 uncertainties = np.clip(uncertainties, 100, 500) return rescaled, uncertainties