mahjong_vision / tools.py
Patrick Jura
Add live_feed script and accompanying vision model notebook
cb97b42
import os
import mlflow
import random
import concurrent.futures
import numpy as np
from mlflow import MlflowClient
from scipy.sparse import csr_matrix
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch
import torch.nn.functional as F
import torch.nn as nn
import matplotlib.pyplot as plt
def translate_to_vision(number):
translation_dict = {
0: '1n', 1: '2n', 2: '3n', 3: '4n', 4: '5n', 5: '6n', 6: '7n', 7: '8n', 8: '9n',
9: '1p', 10: '2p', 11: '3p', 12: '4p', 13: '5p', 14: '6p', 15: '7p', 16: '8p', 17: '9p',
18: '1b', 19: '2b', 20: '3b', 21: '4b', 22: '5b', 23: '6b', 24: '7b', 25: '8b', 26: '9b',
27: 'ew', 28: 'sw', 29: 'ww', 30: 'nw',
31: 'wd', 32: 'gd', 33: 'rd'
}
return translation_dict.get(number, "Invalid number")
def create_reverse_translation_dict():
translation_dict = {
0: '1n', 1: '2n', 2: '3n', 3: '4n', 4: '5n', 5: '6n', 6: '7n', 7: '8n', 8: '9n',
9: '1p', 10: '2p', 11: '3p', 12: '4p', 13: '5p', 14: '6p', 15: '7p', 16: '8p', 17: '9p',
18: '1b', 19: '2b', 20: '3b', 21: '4b', 22: '5b', 23: '6b', 24: '7b', 25: '8b', 26: '9b',
27: 'ew', 28: 'sw', 29: 'ww', 30: 'nw',
31: 'wd', 32: 'gd', 33: 'rd'
}
# Erstellen eines umgekehrten Übersetzungs-Wörterbuchs
reverse_translation_dict = {v: k for k, v in translation_dict.items()}
return reverse_translation_dict
def make_prediction(model, single_data_point):
# Set the model to evaluation mode
model.eval()
# Reshape the tensor to 1D with size 204.
input_tensor = single_data_point.view(-1) # or single_data_point.reshape(-1)
# Add a batch dimension.
input_tensor = input_tensor.unsqueeze(0)
# If you're using a GPU:
input_tensor = input_tensor.to('cuda')
with torch.no_grad(): # No need to compute gradients for this
output = model(input_tensor)
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(output, dim=1)
return probs # Return probabilities instead of a single prediction
def calculate_discard_acc_xgboost(y_true, preds_xgb):
"""
Berechnet die Genauigkeit der Vorhersagen unter Berücksichtigung der Kategorien von Steinen.
Parameters:
y_true (numpy.array): Die wahren Klassenlabels.
preds_xgb (numpy.array): Die vorhergesagten Klassenlabels.
Returns:
float: Die berechnete Genauigkeit.
"""
# Initialisiere die Zähler für richtige und falsche Vorhersagen
correct_predictions = 0
total_predictions = len(y_true)
# Gehe durch jede Vorhersage und überprüfe die Kategorien
for true_label, predicted_label in zip(y_true, preds_xgb):
# Überprüfe die Kategorie der Winde
if (30 <= true_label <= 33) and (30 <= predicted_label <= 33):
correct_predictions += 1
# Überprüfe die Kategorie der Drachen
elif (27 <= true_label <= 29) and (27 <= predicted_label <= 29):
correct_predictions += 1
# Überprüfe alle anderen Klassen
elif true_label == predicted_label:
correct_predictions += 1
# Berechne die Genauigkeit
accuracy = correct_predictions / total_predictions
return accuracy
def reshape_np(np_array):
"""
Extrahiert spezifische Spalten (Features) aus einem Numpy-Array.
Parameters:
np_array (numpy.ndarray): Das Original-Numpy-Array.
Returns:
numpy.ndarray: Ein neues Numpy-Array, das nur die ausgewählten Spalten enthält.
"""
# Wähle die Spalten von 68 bis 101 und die Spalte 511
selected_columns = list(range(68, 135 + 1)) + list(range(238, 373 + 1)) + [510]
reshaped_array = np_array[:, selected_columns]
return reshaped_array
class PenaltyMatrix:
def __init__(self, weights=None, device='cuda'):
self.device = device
self.penalty_matrix = torch.ones(34, 34, device=self.device)
self.weights = weights if weights is not None else torch.ones(34)
# Terminals
indices = [0, 8, 9, 17, 18, 26]
for i in range(len(indices)):
for j in range(i + 1, len(indices)):
self.cs(indices[i], indices[j])
# Winds
indices = [27, 28, 29, 30]
for i in range(len(indices)):
for j in range(i + 1, len(indices)):
self.cs(indices[i], indices[j])
# Dragons
indices = [31, 32, 33]
for i in range(len(indices)):
for j in range(i + 1, len(indices)):
self.cs(indices[i], indices[j])
self.penalty_matrix = self.penalty_matrix.to(self.device)
self.weights = weights.to(self.device)
self.display_matrix()
def cs(self, n1, n2, weight=0.5):
self.penalty_matrix[n1, n2] = weight
self.penalty_matrix[n2, n1] = weight
def display_matrix(self):
# Erstellen Sie eine Kopie der Matrix nur für die Anzeige
display_matrix = self.penalty_matrix.clone()
# Setzen Sie die Diagonalelemente der Kopie auf einen Wert außerhalb des sonstigen Wertebereichs der Matrix
np.fill_diagonal(display_matrix.cpu().numpy(), -1)
plt.figure(figsize=(10, 10))
plt.imshow(display_matrix.cpu(), cmap='viridis_r', interpolation='none', vmin=-1, vmax=1)
plt.colorbar(ticks=[-1, 0, 1]) # Die Ticks für den Farbbalken
plt.title('Penalty Matrix')
plt.show()
def get_penalty(self, targets, predictions):
# Ensure that targets and predictions are on the same device as penalty_matrix
targets = targets.to(self.device)
predictions = predictions.to(self.device)
return self.penalty_matrix[targets, predictions] * self.weights[targets]
class CustomCrossEntropyLoss(nn.Module):
def __init__(self, weights=None, device='cpu'):
super(CustomCrossEntropyLoss, self).__init__()
self.penalty_matrix = PenaltyMatrix(weights, device)
def forward(self, outputs, targets):
# Standard CrossEntropyLoss
ce_loss = F.cross_entropy(outputs, targets, reduction='none')
# Holen Sie die Strafen aus der Matrix basierend auf den tatsächlichen und vorhergesagten Klassen
penalties = self.penalty_matrix.get_penalty(targets, torch.argmax(outputs, dim=1))
# Multiplizieren Sie den Verlust mit den Strafen
custom_loss = ce_loss * penalties
return torch.mean(custom_loss)
class EarlyStopping:
def __init__(self, patience=5, delta=0):
self.patience = patience
self.counter = 0
self.best_score = None
self.delta = delta
def __call__(self, val_loss):
score = -val_loss
if self.best_score is None:
self.best_score = score
elif score < self.best_score + self.delta:
self.counter += 1
if self.counter >= self.patience:
return True
else:
self.best_score = score
self.counter = 0
return False
def load_and_convert(file_path):
data = np.load(file_path, allow_pickle=True)
indices = data['indices']
indptr = data['indptr']
data_ = data['data']
shape = data['shape']
matrix = csr_matrix((data_, indices, indptr), shape=shape)
return matrix.todense()
def create_dataloader(dataset, random_batch_size=False, default_batch_size=256, shuffle=True):
if random_batch_size:
batch_sizes = [32, 64, 128, 256, 512]
batch_size = random.choice(batch_sizes)
else:
batch_size = default_batch_size
print(f'Batch Size of data_loader: {batch_size}.')
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
return data_loader
def parallel_load_and_convert(file):
return load_and_convert(file)
def get_tenhou_files(folder_path=r"N:\tenhou_data\discard_datasets\2019"):
files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.npz')]
return files
def load_latest_learner(experiment_id):
# Erstellen Sie einen MLFlow-Client
client = MlflowClient()
# Holen Sie die Liste der Runs für das gegebene Experiment
runs = mlflow.search_runs(experiment_ids=[experiment_id])
# Filtern Sie die Runs auf diejenigen, die den Status "FINISHED" haben
finished_runs = runs[runs['status'] == 'FINISHED']
if finished_runs.empty:
raise ValueError("Es gibt keine abgeschlossenen Runs für das gegebene Experiment.")
# Sortieren Sie die abgeschlossenen Runs nach dem Startzeitpunkt
finished_runs = finished_runs.sort_values(by="start_time", ascending=False)
# Holen Sie den letzten abgeschlossenen Run
latest_finished_run = finished_runs.iloc[0]
latest_finished_run_id = latest_finished_run.run_id
# Laden des FastAI-Modells aus dem letzten abgeschlossenen Run
artifact_uri = f"runs:/{latest_finished_run_id}/model"
learner = mlflow.fastai.load_model(model_uri=artifact_uri)
return learner
def load_best_model(experiment_id, metric, ascending=False, xgboost=True):
"""
Laden des besten Modells basierend auf einer bestimmten Metrik.
:param experiment_id: ID des MLflow-Experiments
:param metric: Name der Metrik, nach der die Modelle sortiert werden sollen
:param ascending: Ob die Metrik in aufsteigender Reihenfolge sortiert werden soll
"""
# Suchen und Sortieren der Runs basierend auf der Metrik
runs = mlflow.search_runs(experiment_ids=[experiment_id],
order_by=[f"metric.{metric} {'ASC' if ascending else 'DESC'}"])
if len(runs) == 0:
print("Keine Runs gefunden.")
return None
# Auswahl des besten Runs
best_run = runs.iloc[0]
# Laden des Modells des besten Runs
model_uri = f"runs:/{best_run.run_id}/model"
print(model_uri)
if xgboost:
model = mlflow.xgboost.load_model(model_uri)
else:
model = mlflow.pytorch.load_model(model_uri)
return model
def load_random_files(files, random_get=True, amount=10):
files_subset = random.sample(files, amount * 1024)
with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor:
all_data = list(tqdm(executor.map(parallel_load_and_convert, files_subset), total=len(files_subset),
desc="Loading and converting files"))
return all_data
# %%