import os import mlflow import random import concurrent.futures import numpy as np from mlflow import MlflowClient from scipy.sparse import csr_matrix from torch.utils.data import DataLoader from tqdm import tqdm import torch import torch.nn.functional as F import torch.nn as nn import matplotlib.pyplot as plt def translate_to_vision(number): translation_dict = { 0: '1n', 1: '2n', 2: '3n', 3: '4n', 4: '5n', 5: '6n', 6: '7n', 7: '8n', 8: '9n', 9: '1p', 10: '2p', 11: '3p', 12: '4p', 13: '5p', 14: '6p', 15: '7p', 16: '8p', 17: '9p', 18: '1b', 19: '2b', 20: '3b', 21: '4b', 22: '5b', 23: '6b', 24: '7b', 25: '8b', 26: '9b', 27: 'ew', 28: 'sw', 29: 'ww', 30: 'nw', 31: 'wd', 32: 'gd', 33: 'rd' } return translation_dict.get(number, "Invalid number") def create_reverse_translation_dict(): translation_dict = { 0: '1n', 1: '2n', 2: '3n', 3: '4n', 4: '5n', 5: '6n', 6: '7n', 7: '8n', 8: '9n', 9: '1p', 10: '2p', 11: '3p', 12: '4p', 13: '5p', 14: '6p', 15: '7p', 16: '8p', 17: '9p', 18: '1b', 19: '2b', 20: '3b', 21: '4b', 22: '5b', 23: '6b', 24: '7b', 25: '8b', 26: '9b', 27: 'ew', 28: 'sw', 29: 'ww', 30: 'nw', 31: 'wd', 32: 'gd', 33: 'rd' } # Erstellen eines umgekehrten Übersetzungs-Wörterbuchs reverse_translation_dict = {v: k for k, v in translation_dict.items()} return reverse_translation_dict def make_prediction(model, single_data_point): # Set the model to evaluation mode model.eval() # Reshape the tensor to 1D with size 204. input_tensor = single_data_point.view(-1) # or single_data_point.reshape(-1) # Add a batch dimension. input_tensor = input_tensor.unsqueeze(0) # If you're using a GPU: input_tensor = input_tensor.to('cuda') with torch.no_grad(): # No need to compute gradients for this output = model(input_tensor) # Apply softmax to get probabilities probs = torch.nn.functional.softmax(output, dim=1) return probs # Return probabilities instead of a single prediction def calculate_discard_acc_xgboost(y_true, preds_xgb): """ Berechnet die Genauigkeit der Vorhersagen unter Berücksichtigung der Kategorien von Steinen. Parameters: y_true (numpy.array): Die wahren Klassenlabels. preds_xgb (numpy.array): Die vorhergesagten Klassenlabels. Returns: float: Die berechnete Genauigkeit. """ # Initialisiere die Zähler für richtige und falsche Vorhersagen correct_predictions = 0 total_predictions = len(y_true) # Gehe durch jede Vorhersage und überprüfe die Kategorien for true_label, predicted_label in zip(y_true, preds_xgb): # Überprüfe die Kategorie der Winde if (30 <= true_label <= 33) and (30 <= predicted_label <= 33): correct_predictions += 1 # Überprüfe die Kategorie der Drachen elif (27 <= true_label <= 29) and (27 <= predicted_label <= 29): correct_predictions += 1 # Überprüfe alle anderen Klassen elif true_label == predicted_label: correct_predictions += 1 # Berechne die Genauigkeit accuracy = correct_predictions / total_predictions return accuracy def reshape_np(np_array): """ Extrahiert spezifische Spalten (Features) aus einem Numpy-Array. Parameters: np_array (numpy.ndarray): Das Original-Numpy-Array. Returns: numpy.ndarray: Ein neues Numpy-Array, das nur die ausgewählten Spalten enthält. """ # Wähle die Spalten von 68 bis 101 und die Spalte 511 selected_columns = list(range(68, 135 + 1)) + list(range(238, 373 + 1)) + [510] reshaped_array = np_array[:, selected_columns] return reshaped_array class PenaltyMatrix: def __init__(self, weights=None, device='cuda'): self.device = device self.penalty_matrix = torch.ones(34, 34, device=self.device) self.weights = weights if weights is not None else torch.ones(34) # Terminals indices = [0, 8, 9, 17, 18, 26] for i in range(len(indices)): for j in range(i + 1, len(indices)): self.cs(indices[i], indices[j]) # Winds indices = [27, 28, 29, 30] for i in range(len(indices)): for j in range(i + 1, len(indices)): self.cs(indices[i], indices[j]) # Dragons indices = [31, 32, 33] for i in range(len(indices)): for j in range(i + 1, len(indices)): self.cs(indices[i], indices[j]) self.penalty_matrix = self.penalty_matrix.to(self.device) self.weights = weights.to(self.device) self.display_matrix() def cs(self, n1, n2, weight=0.5): self.penalty_matrix[n1, n2] = weight self.penalty_matrix[n2, n1] = weight def display_matrix(self): # Erstellen Sie eine Kopie der Matrix nur für die Anzeige display_matrix = self.penalty_matrix.clone() # Setzen Sie die Diagonalelemente der Kopie auf einen Wert außerhalb des sonstigen Wertebereichs der Matrix np.fill_diagonal(display_matrix.cpu().numpy(), -1) plt.figure(figsize=(10, 10)) plt.imshow(display_matrix.cpu(), cmap='viridis_r', interpolation='none', vmin=-1, vmax=1) plt.colorbar(ticks=[-1, 0, 1]) # Die Ticks für den Farbbalken plt.title('Penalty Matrix') plt.show() def get_penalty(self, targets, predictions): # Ensure that targets and predictions are on the same device as penalty_matrix targets = targets.to(self.device) predictions = predictions.to(self.device) return self.penalty_matrix[targets, predictions] * self.weights[targets] class CustomCrossEntropyLoss(nn.Module): def __init__(self, weights=None, device='cpu'): super(CustomCrossEntropyLoss, self).__init__() self.penalty_matrix = PenaltyMatrix(weights, device) def forward(self, outputs, targets): # Standard CrossEntropyLoss ce_loss = F.cross_entropy(outputs, targets, reduction='none') # Holen Sie die Strafen aus der Matrix basierend auf den tatsächlichen und vorhergesagten Klassen penalties = self.penalty_matrix.get_penalty(targets, torch.argmax(outputs, dim=1)) # Multiplizieren Sie den Verlust mit den Strafen custom_loss = ce_loss * penalties return torch.mean(custom_loss) class EarlyStopping: def __init__(self, patience=5, delta=0): self.patience = patience self.counter = 0 self.best_score = None self.delta = delta def __call__(self, val_loss): score = -val_loss if self.best_score is None: self.best_score = score elif score < self.best_score + self.delta: self.counter += 1 if self.counter >= self.patience: return True else: self.best_score = score self.counter = 0 return False def load_and_convert(file_path): data = np.load(file_path, allow_pickle=True) indices = data['indices'] indptr = data['indptr'] data_ = data['data'] shape = data['shape'] matrix = csr_matrix((data_, indices, indptr), shape=shape) return matrix.todense() def create_dataloader(dataset, random_batch_size=False, default_batch_size=256, shuffle=True): if random_batch_size: batch_sizes = [32, 64, 128, 256, 512] batch_size = random.choice(batch_sizes) else: batch_size = default_batch_size print(f'Batch Size of data_loader: {batch_size}.') data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle) return data_loader def parallel_load_and_convert(file): return load_and_convert(file) def get_tenhou_files(folder_path=r"N:\tenhou_data\discard_datasets\2019"): files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.npz')] return files def load_latest_learner(experiment_id): # Erstellen Sie einen MLFlow-Client client = MlflowClient() # Holen Sie die Liste der Runs für das gegebene Experiment runs = mlflow.search_runs(experiment_ids=[experiment_id]) # Filtern Sie die Runs auf diejenigen, die den Status "FINISHED" haben finished_runs = runs[runs['status'] == 'FINISHED'] if finished_runs.empty: raise ValueError("Es gibt keine abgeschlossenen Runs für das gegebene Experiment.") # Sortieren Sie die abgeschlossenen Runs nach dem Startzeitpunkt finished_runs = finished_runs.sort_values(by="start_time", ascending=False) # Holen Sie den letzten abgeschlossenen Run latest_finished_run = finished_runs.iloc[0] latest_finished_run_id = latest_finished_run.run_id # Laden des FastAI-Modells aus dem letzten abgeschlossenen Run artifact_uri = f"runs:/{latest_finished_run_id}/model" learner = mlflow.fastai.load_model(model_uri=artifact_uri) return learner def load_best_model(experiment_id, metric, ascending=False, xgboost=True): """ Laden des besten Modells basierend auf einer bestimmten Metrik. :param experiment_id: ID des MLflow-Experiments :param metric: Name der Metrik, nach der die Modelle sortiert werden sollen :param ascending: Ob die Metrik in aufsteigender Reihenfolge sortiert werden soll """ # Suchen und Sortieren der Runs basierend auf der Metrik runs = mlflow.search_runs(experiment_ids=[experiment_id], order_by=[f"metric.{metric} {'ASC' if ascending else 'DESC'}"]) if len(runs) == 0: print("Keine Runs gefunden.") return None # Auswahl des besten Runs best_run = runs.iloc[0] # Laden des Modells des besten Runs model_uri = f"runs:/{best_run.run_id}/model" print(model_uri) if xgboost: model = mlflow.xgboost.load_model(model_uri) else: model = mlflow.pytorch.load_model(model_uri) return model def load_random_files(files, random_get=True, amount=10): files_subset = random.sample(files, amount * 1024) with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor: all_data = list(tqdm(executor.map(parallel_load_and_convert, files_subset), total=len(files_subset), desc="Loading and converting files")) return all_data # %%