#!/usr/bin/env python3 """ Script para inferência em pasta de áudios sem labels conhecidas. Classifica todos os áudios recursivamente e salva resultados em CSV. Também salva amostras para verificação qualitativa. """ import os import sys import argparse import glob import torch import librosa import numpy as np import pandas as pd import shutil from pathlib import Path from transformers import AutoFeatureExtractor, AutoModelForAudioClassification from collections import Counter import random from datetime import datetime class AudioInference: def __init__(self, model_path, device=None): """ Inicializa o classificador de áudio. """ self.model_path = model_path self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") # Mapeamento de classes self.label_map = {0: "pt_br", 1: "pt_pt"} # Carregar modelo self._load_model() def _load_model(self): """Carrega o modelo e feature extractor.""" print(f"Carregando modelo de: {self.model_path}") print(f"Usando device: {self.device}") try: self.feature_extractor = AutoFeatureExtractor.from_pretrained(self.model_path) self.model = AutoModelForAudioClassification.from_pretrained(self.model_path) self.model.to(self.device) self.model.eval() print("✓ Modelo carregado com sucesso!") except Exception as e: print(f"✗ Erro ao carregar modelo: {e}") sys.exit(1) def load_audio(self, file_path): """ Carrega um arquivo de áudio. """ try: audio, sr = librosa.load( file_path, sr=self.feature_extractor.sampling_rate, mono=True ) return audio, sr except Exception as e: print(f"Erro ao carregar {file_path}: {e}") return None, None def predict(self, audio_path): """ Classifica um único arquivo de áudio usando janela deslizante. """ # Carregar áudio audio, sr = self.load_audio(audio_path) if audio is None: return None, None, None try: # Configurações da janela deslizante window_size = int(sr * 5.0) # Janela de 5 segundos overlap = int(sr * 2.5) # Sobreposição de 2.5 segundos (50%) # Se o áudio é menor que a janela, usar áudio completo if len(audio) <= window_size: return self._predict_segment(audio, sr) # Aplicar janela deslizante predictions_list = [] confidences_list = [] start = 0 while start < len(audio): end = min(start + window_size, len(audio)) segment = audio[start:end] # Garantir que o segmento tenha tamanho mínimo (1 segundo) if len(segment) >= sr: pred_label, confidence, class_id = self._predict_segment(segment, sr) if pred_label is not None: predictions_list.append(class_id) confidences_list.append(confidence) # Avançar janela start += window_size - overlap # Se chegou no final, parar if end == len(audio): break if not predictions_list: return None, None, None # Combinar predições usando voto majoritário ponderado pela confiança return self._combine_predictions(predictions_list, confidences_list) except Exception as e: print(f"Erro ao processar {audio_path}: {e}") return None, None, None def _predict_segment(self, audio_segment, sr): """ Classifica um segmento individual de áudio. """ try: # Pré-processar segmento inputs = self.feature_extractor( audio_segment, sampling_rate=sr, max_length=int(sr * 5.0), truncation=True, padding=True, return_tensors="pt" ) # Mover para device inputs = {k: v.to(self.device) for k, v in inputs.items()} # Fazer predição with torch.no_grad(): outputs = self.model(**inputs) predictions = torch.nn.functional.softmax(outputs.logits, dim=-1) predicted_class_id = predictions.argmax().item() confidence = predictions.max().item() predicted_label = self.label_map[predicted_class_id] return predicted_label, confidence, predicted_class_id except Exception as e: return None, None, None def _combine_predictions(self, predictions_list, confidences_list): """ Combina múltiplas predições usando voto majoritário ponderado pela confiança. """ # Converter para arrays numpy predictions = np.array(predictions_list) confidences = np.array(confidences_list) # Calcular pontuações ponderadas para cada classe class_scores = {} for class_id in [0, 1]: # pt_br=0, pt_pt=1 mask = predictions == class_id if np.any(mask): # Somar confiança de todas as predições desta classe class_scores[class_id] = np.sum(confidences[mask]) else: class_scores[class_id] = 0.0 # Classe com maior pontuação predicted_class_id = max(class_scores.keys(), key=lambda k: class_scores[k]) # Confiança final: média das confiânças da classe vencedora winner_mask = predictions == predicted_class_id if np.any(winner_mask): final_confidence = np.mean(confidences[winner_mask]) else: final_confidence = 0.0 predicted_label = self.label_map[predicted_class_id] return predicted_label, final_confidence, predicted_class_id def find_audio_files(folder_path, supported_formats=None): """ Encontra todos os arquivos de áudio recursivamente. """ if supported_formats is None: supported_formats = ['.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac'] audio_files = [] for root, dirs, files in os.walk(folder_path): for file in files: if any(file.lower().endswith(ext) for ext in supported_formats): full_path = os.path.join(root, file) audio_files.append(full_path) return audio_files def process_audio_folder(inference_engine, audio_folder, output_csv, samples_folder=None, samples_per_class=10, supported_formats=None): """ Processa uma pasta de áudios recursivamente usando janela deslizante. """ print(f"Processando pasta: {audio_folder}") print("Usando janela deslizante de 5s com sobreposição de 2.5s para áudios longos") # Encontrar todos os arquivos de áudio print("Buscando arquivos de áudio...") audio_files = find_audio_files(audio_folder, supported_formats) # # shuffle # random.shuffle(audio_files) # audio_files = audio_files[0:100] if not audio_files: print(f"✗ Nenhum arquivo de áudio encontrado em {audio_folder}") print(f"Formatos suportados: {supported_formats}") return print(f"✓ Encontrados {len(audio_files)} arquivos de áudio") # Resultados results = [] processed_count = 0 error_count = 0 total_segments = 0 print("\nProcessando arquivos...") for i, audio_file in enumerate(audio_files, 1): if i % 50 == 0 or i == len(audio_files): print(f"Progresso: {i}/{len(audio_files)} ({(i/len(audio_files)*100):.1f}%)") # Classificar áudio predicted_label, confidence, class_id = inference_engine.predict(audio_file) if predicted_label is not None: # Calcular caminho relativo rel_path = os.path.relpath(audio_file, audio_folder) # Calcular duração do áudio para estatísticas try: import librosa audio_duration = librosa.get_duration(filename=audio_file) segments_used = max(1, int((audio_duration - 5.0) / 2.5) + 1) if audio_duration > 5.0 else 1 total_segments += segments_used except: audio_duration = None segments_used = 1 result = { 'arquivo': os.path.basename(audio_file), 'caminho_relativo': rel_path, 'caminho_completo': audio_file, 'label_predita': predicted_label, 'confianca': confidence, 'classe_id': class_id, 'duracao_segundos': audio_duration, 'segmentos_analisados': segments_used, 'timestamp': datetime.now().isoformat() } results.append(result) processed_count += 1 else: error_count += 1 print(f"\n✓ Processamento concluído!") print(f"Arquivos processados: {processed_count}") print(f"Arquivos com erro: {error_count}") print(f"Total de segmentos analisados: {total_segments}") if not results: print("✗ Nenhum arquivo foi processado com sucesso.") return # Criar DataFrame df = pd.DataFrame(results) # Estatísticas print(f"\n=== Estatísticas ===") distribution = Counter(df['label_predita']) for label, count in distribution.items(): percentage = (count / len(results)) * 100 print(f"{label}: {count} arquivos ({percentage:.1f}%)") avg_confidence = df['confianca'].mean() print(f"Confiança média: {avg_confidence:.3f}") # Arquivos com baixa confiança low_confidence_threshold = 0.7 low_confidence = df[df['confianca'] < low_confidence_threshold] print(f"Arquivos com baixa confiança (< {low_confidence_threshold}): {len(low_confidence)} ({len(low_confidence)/len(df)*100:.1f}%)") # Salvar CSV df.to_csv(output_csv, index=False) print(f"\n✓ Resultados salvos em: {output_csv}") # Salvar amostras para verificação qualitativa if samples_folder: save_quality_samples(df, audio_folder, samples_folder, samples_per_class) return df def save_quality_samples(df, source_folder, samples_folder, samples_per_class=10): """ Salva amostras de cada classe para verificação qualitativa. """ print(f"\nSalvando amostras para verificação qualitativa...") # Criar pasta de amostras os.makedirs(samples_folder, exist_ok=True) # Para cada classe for label in df['label_predita'].unique(): print(f"Salvando amostras da classe: {label}") # Criar subpasta para a classe class_folder = os.path.join(samples_folder, label) os.makedirs(class_folder, exist_ok=True) # Filtrar arquivos da classe class_files = df[df['label_predita'] == label] # Estratégia de amostragem: metade com alta confiança, metade aleatória high_conf = class_files[class_files['confianca'] >= 0.8].sample( n=min(samples_per_class//2, len(class_files[class_files['confianca'] >= 0.8])), random_state=42 ) if len(class_files[class_files['confianca'] >= 0.8]) > 0 else pd.DataFrame() remaining_needed = samples_per_class - len(high_conf) if remaining_needed > 0: remaining_files = class_files[~class_files.index.isin(high_conf.index)] random_sample = remaining_files.sample( n=min(remaining_needed, len(remaining_files)), random_state=42 ) if len(remaining_files) > 0 else pd.DataFrame() else: random_sample = pd.DataFrame() # Combinar amostras samples = pd.concat([high_conf, random_sample]).head(samples_per_class) # Copiar arquivos sample_info = [] for idx, row in samples.iterrows(): source_path = row['caminho_completo'] filename = row['arquivo'] confidence = row['confianca'] # Nome do arquivo com confiança name, ext = os.path.splitext(filename) new_filename = f"{name}_conf{confidence:.3f}{ext}" dest_path = os.path.join(class_folder, new_filename) try: shutil.copy2(source_path, dest_path) sample_info.append({ 'arquivo_original': filename, 'arquivo_copia': new_filename, 'confianca': confidence, 'caminho_original': row['caminho_relativo'] }) except Exception as e: print(f"Erro ao copiar {source_path}: {e}") # Salvar informações das amostras if sample_info: sample_df = pd.DataFrame(sample_info) info_file = os.path.join(class_folder, "info_amostras.csv") sample_df.to_csv(info_file, index=False) print(f" ✓ {len(sample_info)} amostras salvas em {class_folder}") else: print(f" ✗ Nenhuma amostra salva para {label}") print(f"✓ Amostras salvas em: {samples_folder}") def main(): parser = argparse.ArgumentParser( description="Inferência em pasta de áudios sem labels conhecidas" ) parser.add_argument( "audio_folder", help="Pasta raiz contendo arquivos de áudio (busca recursiva)" ) parser.add_argument( "--model_path", default="./nn/results/final_model", help="Caminho para o modelo treinado" ) parser.add_argument( "--output", default="inferencia_resultados.csv", help="Arquivo CSV para salvar resultados" ) parser.add_argument( "--samples_folder", default="amostras_verificacao", help="Pasta para salvar amostras para verificação qualitativa" ) parser.add_argument( "--samples_per_class", type=int, default=100, help="Número de amostras por classe para verificação" ) parser.add_argument( "--no_samples", action="store_true", help="Não salvar amostras para verificação" ) parser.add_argument( "--formats", nargs="+", default=['.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac'], help="Formatos de áudio suportados" ) args = parser.parse_args() # Validações if not os.path.exists(args.audio_folder): print(f"✗ Pasta '{args.audio_folder}' não encontrada!") sys.exit(1) if not os.path.exists(args.model_path): print(f"✗ Modelo '{args.model_path}' não encontrado!") sys.exit(1) print("=== Inferência em Pasta de Áudios ===") print(f"Pasta de áudio: {args.audio_folder}") print(f"Modelo: {args.model_path}") print(f"Arquivo de saída: {args.output}") if not args.no_samples: print(f"Pasta de amostras: {args.samples_folder}") print(f"Amostras por classe: {args.samples_per_class}") print(f"Formatos suportados: {args.formats}") print() # Inicializar inferência inference_engine = AudioInference(args.model_path) # Processar pasta samples_folder = None if args.no_samples else args.samples_folder df = process_audio_folder( inference_engine=inference_engine, audio_folder=args.audio_folder, output_csv=args.output, samples_folder=samples_folder, samples_per_class=args.samples_per_class, supported_formats=args.formats ) if df is not None: print("\n=== Resumo Final ===") print(f"Total de arquivos processados: {len(df)}") print(f"Resultados salvos em: {args.output}") if not args.no_samples: print(f"Amostras para verificação em: {args.samples_folder}") print("\nClassificação concluída com sucesso! 🎉") if __name__ == "__main__": # Mostrar ajuda se nenhum argumento if len(sys.argv) == 1: print("=== Script de Inferência de Áudios ===") print() print("Este script classifica áudios em uma pasta recursivamente.") print("Não precisa de labels conhecidas - classifica tudo automaticamente.") print() print("Uso básico:") print(" python infer_audio_folder.py ") print() print("Exemplos:") print(" python infer_audio_folder.py ../audios_novos") print(" python infer_audio_folder.py ../dataset --output resultados.csv") print(" python infer_audio_folder.py ../audios --samples_per_class 20") print(" python infer_audio_folder.py ../audios --no_samples") print() print("O que o script faz:") print("1. Busca recursivamente todos os arquivos de áudio") print("2. Classifica cada um como pt_br ou pt_pt") print("3. Salva resultados detalhados em CSV") print("4. Cria amostras para verificação manual") print("5. Mostra estatísticas dos resultados") print() print("Para ver todas as opções: python infer_audio_folder.py --help") sys.exit(0) main()