# import torch # import torch.nn as nn # import numpy as np # import pickle # import json # import os # class ImprovedCashFlowLSTM(nn.Module): # def __init__(self, input_size, hidden_size=128, num_layers=2, forecast_horizon=13, dropout=0.2): # super(ImprovedCashFlowLSTM, self).__init__() # self.hidden_size = hidden_size # self.num_layers = num_layers # self.forecast_horizon = forecast_horizon # self.lstm = nn.LSTM( # input_size, # hidden_size, # num_layers, # dropout=dropout if num_layers > 1 else 0, # batch_first=True # ) # self.output_layers = nn.Sequential( # nn.Linear(hidden_size, hidden_size), # nn.ReLU(), # nn.Dropout(dropout), # nn.Linear(hidden_size, forecast_horizon) # ) # def forward(self, x): # lstm_out, (hidden, cell) = self.lstm(x) # last_hidden = lstm_out[:, -1, :] # forecast = self.output_layers(last_hidden) # return forecast # def load_model_and_artifacts( # model_path="new_best_improved_model.pth", # scaler_path="scaler.pkl", # feature_names_path="feature_names.json", # config_path="model_config.json" # ): # """Load model, scaler, feature names, and configuration.""" # if not all(os.path.exists(path) for path in [model_path, scaler_path, feature_names_path, config_path]): # missing = [path for path in [model_path, scaler_path, feature_names_path, config_path] if not os.path.exists(path)] # raise FileNotFoundError(f"Missing files: {missing}") # with open(config_path, "r") as f: # config = json.load(f) # with open(scaler_path, "rb") as f: # scaler = pickle.load(f) # with open(feature_names_path, "r") as f: # feature_names = json.load(f) # input_size = config["input_size"] # model = ImprovedCashFlowLSTM( # input_size=input_size, # hidden_size=config["hidden_size"], # num_layers=config["num_layers"], # forecast_horizon=config["forecast_horizon"], # dropout=config["dropout"] # ) # model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"))) # model.eval() # return model, scaler, feature_names, config # def predict(model, scaler, sequences): # """Generate 13-week sales forecasts from input sequences.""" # device = torch.device("cpu") # model.to(device) # # Validate input shape: (batch_size, sequence_length=21, n_features=20) # if len(sequences.shape) != 3 or sequences.shape[1] != 21 or sequences.shape[2] != model.lstm.input_size: # raise ValueError(f"Expected input shape (batch_size, 21, {model.lstm.input_size}), got {sequences.shape}") # # Convert to tensor # sequences = torch.tensor(sequences, dtype=torch.float32).to(device) # # Generate predictions # with torch.no_grad(): # predictions = model(sequences).cpu().numpy() # Shape: (batch_size, 13) # # Inverse transform predictions (sales is first feature) # dummy = np.zeros((predictions.shape[0] * predictions.shape[1], scaler.n_features_in_)) # dummy[:, 0] = predictions.flatten() # rescaled = scaler.inverse_transform(dummy)[:, 0].reshape(predictions.shape) # # Ensure non-negative predictions and clip to training range # rescaled = np.maximum(rescaled, 0) # rescaled = np.clip(rescaled, 3000, 19372) # Training sales range: $3069–19372 # # Estimate uncertainty: scaled std of predictions per sequence, repeated for each timestep # uncertainties = 0.2 * np.std(rescaled, axis=1, keepdims=True) + 100 # Shape: (batch_size, 1) # uncertainties = np.repeat(uncertainties, rescaled.shape[1], axis=1) # Shape: (batch_size, 13) # uncertainties = np.clip(uncertainties, 100, 1000) # Wider bounds to avoid constant clipping # return rescaled, uncertainties import torch import torch.nn as nn import numpy as np import pandas as pd import pickle import json import os from datetime import datetime class ImprovedCashFlowLSTM(nn.Module): def __init__(self, input_size, hidden_size=128, num_layers=2, forecast_horizon=13, dropout=0.2): super(ImprovedCashFlowLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.forecast_horizon = forecast_horizon self.lstm = nn.LSTM( input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True ) self.output_layers = nn.Sequential( nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_size, forecast_horizon) ) def forward(self, x): lstm_out, (hidden, cell) = self.lstm(x) last_hidden = lstm_out[:, -1, :] forecast = self.output_layers(last_hidden) return forecast def load_model_and_artifacts( model_path="new_best_improved_model.pth", scaler_path="scaler.pkl", feature_names_path="feature_names.json", config_path="model_config.json" ): """Load model, scaler, feature names, and configuration.""" if not all(os.path.exists(path) for path in [model_path, scaler_path, feature_names_path, config_path]): missing = [path for path in [model_path, scaler_path, feature_names_path, config_path] if not os.path.exists(path)] raise FileNotFoundError(f"Missing files: {missing}") with open(config_path, "r") as f: config = json.load(f) with open(scaler_path, "rb") as f: scaler = pickle.load(f) with open(feature_names_path, "r") as f: feature_names = json.load(f) input_size = config["input_size"] model = ImprovedCashFlowLSTM( input_size=input_size, hidden_size=config["hidden_size"], num_layers=config["num_layers"], forecast_horizon=config["forecast_horizon"], dropout=config["dropout"] ) model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"))) model.eval() return model, scaler, feature_names, config def derive_features(df, feature_names, sequence_length=21): """Derive 20 features from minimal input columns: date, sales, onpromotion, dcoilwtico, is_holiday.""" required_columns = ["date", "sales", "onpromotion", "dcoilwtico", "is_holiday"] if not all(col in df.columns for col in required_columns): raise ValueError(f"CSV must contain columns: {', '.join(required_columns)}") if len(df) != sequence_length: raise ValueError(f"CSV must have exactly {sequence_length} rows, got {len(df)}") # Convert date to datetime df["date"] = pd.to_datetime(df["date"]) # Initialize output DataFrame with all required features feature_df = pd.DataFrame(0.0, index=df.index, columns=feature_names) # Copy direct input features for col in ["sales", "onpromotion", "dcoilwtico", "is_holiday"]: feature_df[col] = df[col] # Derive temporal features feature_df["trend"] = np.linspace(0, sequence_length - 1, sequence_length) feature_df["dayofweek_sin"] = np.sin(2 * np.pi * df["date"].dt.dayofweek / 7) feature_df["dayofweek_cos"] = np.cos(2 * np.pi * df["date"].dt.dayofweek / 7) feature_df["month_sin"] = np.sin(2 * np.pi * df["date"].dt.month / 12) feature_df["month_cos"] = np.cos(2 * np.pi * df["date"].dt.month / 12) feature_df["quarter"] = df["date"].dt.quarter feature_df["is_weekend"] = (df["date"].dt.dayofweek >= 5).astype(float) # Derive lag features for lag in [1, 2, 3]: feature_df[f"lag_{lag}"] = df["sales"].shift(lag).fillna(df["sales"].iloc[0]) # Derive moving average and ratio features for window in [7, 14]: feature_df[f"ma_{window}"] = df["sales"].rolling(window=window, min_periods=1).mean() feature_df[f"ratio_{window}"] = df["sales"] / (feature_df[f"ma_{window}"] + 1e-8) # Derive promotion lag features for lag in [1, 2]: feature_df[f"promo_lag_{lag}"] = df["onpromotion"].shift(lag).fillna(df["onpromotion"].iloc[0]) return feature_df[feature_names].values def predict(model, scaler, sequences): """Generate 13-week sales forecasts from input sequences.""" device = torch.device("cpu") model.to(device) # Validate input shape: (batch_size, sequence_length=21, n_features=20) if len(sequences.shape) != 3 or sequences.shape[1] != 21 or sequences.shape[2] != model.lstm.input_size: raise ValueError(f"Expected input shape (batch_size, 21, {model.lstm.input_size}), got {sequences.shape}") # Convert to tensor sequences = torch.tensor(sequences, dtype=torch.float32).to(device) # Generate predictions with torch.no_grad(): predictions = model(sequences).cpu().numpy() # Shape: (batch_size, 13) # Inverse transform predictions (sales is first feature) dummy = np.zeros((predictions.shape[0] * predictions.shape[1], scaler.n_features_in_)) dummy[:, 0] = predictions.flatten() rescaled = scaler.inverse_transform(dummy)[:, 0].reshape(predictions.shape) # Ensure non-negative predictions and clip to training range rescaled = np.maximum(rescaled, 0) rescaled = np.clip(rescaled, 3000, 19372) # Training sales range: $3069–19372 # Estimate uncertainty: scaled std of predictions per sequence, repeated for each timestep uncertainties = 0.2 * np.std(rescaled, axis=1, keepdims=True) + 100 # Shape: (batch_size, 1) uncertainties = np.repeat(uncertainties, rescaled.shape[1], axis=1) # Shape: (batch_size, 13) uncertainties = np.clip(uncertainties, 100, 1000) # Wider bounds to avoid constant clipping return rescaled, uncertainties