|
|
|
|
|
""" |
|
|
Script para inferência em pasta de áudios sem labels conhecidas. |
|
|
Classifica todos os áudios recursivamente e salva resultados em CSV. |
|
|
Também salva amostras para verificação qualitativa. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import argparse |
|
|
import glob |
|
|
import torch |
|
|
import librosa |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification |
|
|
from collections import Counter |
|
|
import random |
|
|
from datetime import datetime |
|
|
|
|
|
class AudioInference: |
|
|
def __init__(self, model_path, device=None): |
|
|
""" |
|
|
Inicializa o classificador de áudio. |
|
|
""" |
|
|
self.model_path = model_path |
|
|
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
self.label_map = {0: "pt_br", 1: "pt_pt"} |
|
|
|
|
|
|
|
|
self._load_model() |
|
|
|
|
|
def _load_model(self): |
|
|
"""Carrega o modelo e feature extractor.""" |
|
|
print(f"Carregando modelo de: {self.model_path}") |
|
|
print(f"Usando device: {self.device}") |
|
|
|
|
|
try: |
|
|
self.feature_extractor = AutoFeatureExtractor.from_pretrained(self.model_path) |
|
|
self.model = AutoModelForAudioClassification.from_pretrained(self.model_path) |
|
|
|
|
|
self.model.to(self.device) |
|
|
self.model.eval() |
|
|
|
|
|
print("✓ Modelo carregado com sucesso!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"✗ Erro ao carregar modelo: {e}") |
|
|
sys.exit(1) |
|
|
|
|
|
def load_audio(self, file_path): |
|
|
""" |
|
|
Carrega um arquivo de áudio. |
|
|
""" |
|
|
try: |
|
|
audio, sr = librosa.load( |
|
|
file_path, |
|
|
sr=self.feature_extractor.sampling_rate, |
|
|
mono=True |
|
|
) |
|
|
return audio, sr |
|
|
except Exception as e: |
|
|
print(f"Erro ao carregar {file_path}: {e}") |
|
|
return None, None |
|
|
|
|
|
def predict(self, audio_path): |
|
|
""" |
|
|
Classifica um único arquivo de áudio usando janela deslizante. |
|
|
""" |
|
|
|
|
|
audio, sr = self.load_audio(audio_path) |
|
|
if audio is None: |
|
|
return None, None, None |
|
|
|
|
|
try: |
|
|
|
|
|
window_size = int(sr * 5.0) |
|
|
overlap = int(sr * 2.5) |
|
|
|
|
|
|
|
|
if len(audio) <= window_size: |
|
|
return self._predict_segment(audio, sr) |
|
|
|
|
|
|
|
|
predictions_list = [] |
|
|
confidences_list = [] |
|
|
|
|
|
start = 0 |
|
|
while start < len(audio): |
|
|
end = min(start + window_size, len(audio)) |
|
|
segment = audio[start:end] |
|
|
|
|
|
|
|
|
if len(segment) >= sr: |
|
|
pred_label, confidence, class_id = self._predict_segment(segment, sr) |
|
|
if pred_label is not None: |
|
|
predictions_list.append(class_id) |
|
|
confidences_list.append(confidence) |
|
|
|
|
|
|
|
|
start += window_size - overlap |
|
|
|
|
|
|
|
|
if end == len(audio): |
|
|
break |
|
|
|
|
|
if not predictions_list: |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
return self._combine_predictions(predictions_list, confidences_list) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erro ao processar {audio_path}: {e}") |
|
|
return None, None, None |
|
|
|
|
|
def _predict_segment(self, audio_segment, sr): |
|
|
""" |
|
|
Classifica um segmento individual de áudio. |
|
|
""" |
|
|
try: |
|
|
|
|
|
inputs = self.feature_extractor( |
|
|
audio_segment, |
|
|
sampling_rate=sr, |
|
|
max_length=int(sr * 5.0), |
|
|
truncation=True, |
|
|
padding=True, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
|
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model(**inputs) |
|
|
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1) |
|
|
predicted_class_id = predictions.argmax().item() |
|
|
confidence = predictions.max().item() |
|
|
|
|
|
predicted_label = self.label_map[predicted_class_id] |
|
|
|
|
|
return predicted_label, confidence, predicted_class_id |
|
|
|
|
|
except Exception as e: |
|
|
return None, None, None |
|
|
|
|
|
def _combine_predictions(self, predictions_list, confidences_list): |
|
|
""" |
|
|
Combina múltiplas predições usando voto majoritário ponderado pela confiança. |
|
|
""" |
|
|
|
|
|
predictions = np.array(predictions_list) |
|
|
confidences = np.array(confidences_list) |
|
|
|
|
|
|
|
|
class_scores = {} |
|
|
for class_id in [0, 1]: |
|
|
mask = predictions == class_id |
|
|
if np.any(mask): |
|
|
|
|
|
class_scores[class_id] = np.sum(confidences[mask]) |
|
|
else: |
|
|
class_scores[class_id] = 0.0 |
|
|
|
|
|
|
|
|
predicted_class_id = max(class_scores.keys(), key=lambda k: class_scores[k]) |
|
|
|
|
|
|
|
|
winner_mask = predictions == predicted_class_id |
|
|
if np.any(winner_mask): |
|
|
final_confidence = np.mean(confidences[winner_mask]) |
|
|
else: |
|
|
final_confidence = 0.0 |
|
|
|
|
|
predicted_label = self.label_map[predicted_class_id] |
|
|
|
|
|
return predicted_label, final_confidence, predicted_class_id |
|
|
|
|
|
def find_audio_files(folder_path, supported_formats=None): |
|
|
""" |
|
|
Encontra todos os arquivos de áudio recursivamente. |
|
|
""" |
|
|
if supported_formats is None: |
|
|
supported_formats = ['.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac'] |
|
|
|
|
|
audio_files = [] |
|
|
for root, dirs, files in os.walk(folder_path): |
|
|
for file in files: |
|
|
if any(file.lower().endswith(ext) for ext in supported_formats): |
|
|
full_path = os.path.join(root, file) |
|
|
audio_files.append(full_path) |
|
|
|
|
|
return audio_files |
|
|
|
|
|
def process_audio_folder(inference_engine, audio_folder, output_csv, samples_folder=None, |
|
|
samples_per_class=10, supported_formats=None): |
|
|
""" |
|
|
Processa uma pasta de áudios recursivamente usando janela deslizante. |
|
|
""" |
|
|
print(f"Processando pasta: {audio_folder}") |
|
|
print("Usando janela deslizante de 5s com sobreposição de 2.5s para áudios longos") |
|
|
|
|
|
|
|
|
print("Buscando arquivos de áudio...") |
|
|
audio_files = find_audio_files(audio_folder, supported_formats) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not audio_files: |
|
|
print(f"✗ Nenhum arquivo de áudio encontrado em {audio_folder}") |
|
|
print(f"Formatos suportados: {supported_formats}") |
|
|
return |
|
|
|
|
|
print(f"✓ Encontrados {len(audio_files)} arquivos de áudio") |
|
|
|
|
|
|
|
|
results = [] |
|
|
processed_count = 0 |
|
|
error_count = 0 |
|
|
total_segments = 0 |
|
|
|
|
|
print("\nProcessando arquivos...") |
|
|
for i, audio_file in enumerate(audio_files, 1): |
|
|
if i % 50 == 0 or i == len(audio_files): |
|
|
print(f"Progresso: {i}/{len(audio_files)} ({(i/len(audio_files)*100):.1f}%)") |
|
|
|
|
|
|
|
|
predicted_label, confidence, class_id = inference_engine.predict(audio_file) |
|
|
|
|
|
if predicted_label is not None: |
|
|
|
|
|
rel_path = os.path.relpath(audio_file, audio_folder) |
|
|
|
|
|
|
|
|
try: |
|
|
import librosa |
|
|
audio_duration = librosa.get_duration(filename=audio_file) |
|
|
segments_used = max(1, int((audio_duration - 5.0) / 2.5) + 1) if audio_duration > 5.0 else 1 |
|
|
total_segments += segments_used |
|
|
except: |
|
|
audio_duration = None |
|
|
segments_used = 1 |
|
|
|
|
|
result = { |
|
|
'arquivo': os.path.basename(audio_file), |
|
|
'caminho_relativo': rel_path, |
|
|
'caminho_completo': audio_file, |
|
|
'label_predita': predicted_label, |
|
|
'confianca': confidence, |
|
|
'classe_id': class_id, |
|
|
'duracao_segundos': audio_duration, |
|
|
'segmentos_analisados': segments_used, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
results.append(result) |
|
|
processed_count += 1 |
|
|
else: |
|
|
error_count += 1 |
|
|
|
|
|
print(f"\n✓ Processamento concluído!") |
|
|
print(f"Arquivos processados: {processed_count}") |
|
|
print(f"Arquivos com erro: {error_count}") |
|
|
print(f"Total de segmentos analisados: {total_segments}") |
|
|
|
|
|
if not results: |
|
|
print("✗ Nenhum arquivo foi processado com sucesso.") |
|
|
return |
|
|
|
|
|
|
|
|
df = pd.DataFrame(results) |
|
|
|
|
|
|
|
|
print(f"\n=== Estatísticas ===") |
|
|
distribution = Counter(df['label_predita']) |
|
|
for label, count in distribution.items(): |
|
|
percentage = (count / len(results)) * 100 |
|
|
print(f"{label}: {count} arquivos ({percentage:.1f}%)") |
|
|
|
|
|
avg_confidence = df['confianca'].mean() |
|
|
print(f"Confiança média: {avg_confidence:.3f}") |
|
|
|
|
|
|
|
|
low_confidence_threshold = 0.7 |
|
|
low_confidence = df[df['confianca'] < low_confidence_threshold] |
|
|
print(f"Arquivos com baixa confiança (< {low_confidence_threshold}): {len(low_confidence)} ({len(low_confidence)/len(df)*100:.1f}%)") |
|
|
|
|
|
|
|
|
df.to_csv(output_csv, index=False) |
|
|
print(f"\n✓ Resultados salvos em: {output_csv}") |
|
|
|
|
|
|
|
|
if samples_folder: |
|
|
save_quality_samples(df, audio_folder, samples_folder, samples_per_class) |
|
|
|
|
|
return df |
|
|
|
|
|
def save_quality_samples(df, source_folder, samples_folder, samples_per_class=10): |
|
|
""" |
|
|
Salva amostras de cada classe para verificação qualitativa. |
|
|
""" |
|
|
print(f"\nSalvando amostras para verificação qualitativa...") |
|
|
|
|
|
|
|
|
os.makedirs(samples_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
for label in df['label_predita'].unique(): |
|
|
print(f"Salvando amostras da classe: {label}") |
|
|
|
|
|
|
|
|
class_folder = os.path.join(samples_folder, label) |
|
|
os.makedirs(class_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
class_files = df[df['label_predita'] == label] |
|
|
|
|
|
|
|
|
high_conf = class_files[class_files['confianca'] >= 0.8].sample( |
|
|
n=min(samples_per_class//2, len(class_files[class_files['confianca'] >= 0.8])), |
|
|
random_state=42 |
|
|
) if len(class_files[class_files['confianca'] >= 0.8]) > 0 else pd.DataFrame() |
|
|
|
|
|
remaining_needed = samples_per_class - len(high_conf) |
|
|
if remaining_needed > 0: |
|
|
remaining_files = class_files[~class_files.index.isin(high_conf.index)] |
|
|
random_sample = remaining_files.sample( |
|
|
n=min(remaining_needed, len(remaining_files)), |
|
|
random_state=42 |
|
|
) if len(remaining_files) > 0 else pd.DataFrame() |
|
|
else: |
|
|
random_sample = pd.DataFrame() |
|
|
|
|
|
|
|
|
samples = pd.concat([high_conf, random_sample]).head(samples_per_class) |
|
|
|
|
|
|
|
|
sample_info = [] |
|
|
for idx, row in samples.iterrows(): |
|
|
source_path = row['caminho_completo'] |
|
|
filename = row['arquivo'] |
|
|
confidence = row['confianca'] |
|
|
|
|
|
|
|
|
name, ext = os.path.splitext(filename) |
|
|
new_filename = f"{name}_conf{confidence:.3f}{ext}" |
|
|
|
|
|
dest_path = os.path.join(class_folder, new_filename) |
|
|
|
|
|
try: |
|
|
shutil.copy2(source_path, dest_path) |
|
|
sample_info.append({ |
|
|
'arquivo_original': filename, |
|
|
'arquivo_copia': new_filename, |
|
|
'confianca': confidence, |
|
|
'caminho_original': row['caminho_relativo'] |
|
|
}) |
|
|
except Exception as e: |
|
|
print(f"Erro ao copiar {source_path}: {e}") |
|
|
|
|
|
|
|
|
if sample_info: |
|
|
sample_df = pd.DataFrame(sample_info) |
|
|
info_file = os.path.join(class_folder, "info_amostras.csv") |
|
|
sample_df.to_csv(info_file, index=False) |
|
|
|
|
|
print(f" ✓ {len(sample_info)} amostras salvas em {class_folder}") |
|
|
else: |
|
|
print(f" ✗ Nenhuma amostra salva para {label}") |
|
|
|
|
|
print(f"✓ Amostras salvas em: {samples_folder}") |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Inferência em pasta de áudios sem labels conhecidas" |
|
|
) |
|
|
parser.add_argument( |
|
|
"audio_folder", |
|
|
help="Pasta raiz contendo arquivos de áudio (busca recursiva)" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--model_path", |
|
|
default="./nn/results/final_model", |
|
|
help="Caminho para o modelo treinado" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
default="inferencia_resultados.csv", |
|
|
help="Arquivo CSV para salvar resultados" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--samples_folder", |
|
|
default="amostras_verificacao", |
|
|
help="Pasta para salvar amostras para verificação qualitativa" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--samples_per_class", |
|
|
type=int, |
|
|
default=100, |
|
|
help="Número de amostras por classe para verificação" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--no_samples", |
|
|
action="store_true", |
|
|
help="Não salvar amostras para verificação" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--formats", |
|
|
nargs="+", |
|
|
default=['.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac'], |
|
|
help="Formatos de áudio suportados" |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if not os.path.exists(args.audio_folder): |
|
|
print(f"✗ Pasta '{args.audio_folder}' não encontrada!") |
|
|
sys.exit(1) |
|
|
|
|
|
if not os.path.exists(args.model_path): |
|
|
print(f"✗ Modelo '{args.model_path}' não encontrado!") |
|
|
sys.exit(1) |
|
|
|
|
|
print("=== Inferência em Pasta de Áudios ===") |
|
|
print(f"Pasta de áudio: {args.audio_folder}") |
|
|
print(f"Modelo: {args.model_path}") |
|
|
print(f"Arquivo de saída: {args.output}") |
|
|
|
|
|
if not args.no_samples: |
|
|
print(f"Pasta de amostras: {args.samples_folder}") |
|
|
print(f"Amostras por classe: {args.samples_per_class}") |
|
|
|
|
|
print(f"Formatos suportados: {args.formats}") |
|
|
print() |
|
|
|
|
|
|
|
|
inference_engine = AudioInference(args.model_path) |
|
|
|
|
|
|
|
|
samples_folder = None if args.no_samples else args.samples_folder |
|
|
|
|
|
df = process_audio_folder( |
|
|
inference_engine=inference_engine, |
|
|
audio_folder=args.audio_folder, |
|
|
output_csv=args.output, |
|
|
samples_folder=samples_folder, |
|
|
samples_per_class=args.samples_per_class, |
|
|
supported_formats=args.formats |
|
|
) |
|
|
|
|
|
if df is not None: |
|
|
print("\n=== Resumo Final ===") |
|
|
print(f"Total de arquivos processados: {len(df)}") |
|
|
print(f"Resultados salvos em: {args.output}") |
|
|
|
|
|
if not args.no_samples: |
|
|
print(f"Amostras para verificação em: {args.samples_folder}") |
|
|
|
|
|
print("\nClassificação concluída com sucesso! 🎉") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
if len(sys.argv) == 1: |
|
|
print("=== Script de Inferência de Áudios ===") |
|
|
print() |
|
|
print("Este script classifica áudios em uma pasta recursivamente.") |
|
|
print("Não precisa de labels conhecidas - classifica tudo automaticamente.") |
|
|
print() |
|
|
print("Uso básico:") |
|
|
print(" python infer_audio_folder.py <pasta_de_audios>") |
|
|
print() |
|
|
print("Exemplos:") |
|
|
print(" python infer_audio_folder.py ../audios_novos") |
|
|
print(" python infer_audio_folder.py ../dataset --output resultados.csv") |
|
|
print(" python infer_audio_folder.py ../audios --samples_per_class 20") |
|
|
print(" python infer_audio_folder.py ../audios --no_samples") |
|
|
print() |
|
|
print("O que o script faz:") |
|
|
print("1. Busca recursivamente todos os arquivos de áudio") |
|
|
print("2. Classifica cada um como pt_br ou pt_pt") |
|
|
print("3. Salva resultados detalhados em CSV") |
|
|
print("4. Cria amostras para verificação manual") |
|
|
print("5. Mostra estatísticas dos resultados") |
|
|
print() |
|
|
print("Para ver todas as opções: python infer_audio_folder.py --help") |
|
|
sys.exit(0) |
|
|
|
|
|
main() |
|
|
|