portuguese-accent-classifier / infer_audio_folder.py
lgris's picture
Upload Portuguese accent classifier with TensorBoard logs
755d2a6 verified
#!/usr/bin/env python3
"""
Script para inferência em pasta de áudios sem labels conhecidas.
Classifica todos os áudios recursivamente e salva resultados em CSV.
Também salva amostras para verificação qualitativa.
"""
import os
import sys
import argparse
import glob
import torch
import librosa
import numpy as np
import pandas as pd
import shutil
from pathlib import Path
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification
from collections import Counter
import random
from datetime import datetime
class AudioInference:
def __init__(self, model_path, device=None):
"""
Inicializa o classificador de áudio.
"""
self.model_path = model_path
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Mapeamento de classes
self.label_map = {0: "pt_br", 1: "pt_pt"}
# Carregar modelo
self._load_model()
def _load_model(self):
"""Carrega o modelo e feature extractor."""
print(f"Carregando modelo de: {self.model_path}")
print(f"Usando device: {self.device}")
try:
self.feature_extractor = AutoFeatureExtractor.from_pretrained(self.model_path)
self.model = AutoModelForAudioClassification.from_pretrained(self.model_path)
self.model.to(self.device)
self.model.eval()
print("✓ Modelo carregado com sucesso!")
except Exception as e:
print(f"✗ Erro ao carregar modelo: {e}")
sys.exit(1)
def load_audio(self, file_path):
"""
Carrega um arquivo de áudio.
"""
try:
audio, sr = librosa.load(
file_path,
sr=self.feature_extractor.sampling_rate,
mono=True
)
return audio, sr
except Exception as e:
print(f"Erro ao carregar {file_path}: {e}")
return None, None
def predict(self, audio_path):
"""
Classifica um único arquivo de áudio usando janela deslizante.
"""
# Carregar áudio
audio, sr = self.load_audio(audio_path)
if audio is None:
return None, None, None
try:
# Configurações da janela deslizante
window_size = int(sr * 5.0) # Janela de 5 segundos
overlap = int(sr * 2.5) # Sobreposição de 2.5 segundos (50%)
# Se o áudio é menor que a janela, usar áudio completo
if len(audio) <= window_size:
return self._predict_segment(audio, sr)
# Aplicar janela deslizante
predictions_list = []
confidences_list = []
start = 0
while start < len(audio):
end = min(start + window_size, len(audio))
segment = audio[start:end]
# Garantir que o segmento tenha tamanho mínimo (1 segundo)
if len(segment) >= sr:
pred_label, confidence, class_id = self._predict_segment(segment, sr)
if pred_label is not None:
predictions_list.append(class_id)
confidences_list.append(confidence)
# Avançar janela
start += window_size - overlap
# Se chegou no final, parar
if end == len(audio):
break
if not predictions_list:
return None, None, None
# Combinar predições usando voto majoritário ponderado pela confiança
return self._combine_predictions(predictions_list, confidences_list)
except Exception as e:
print(f"Erro ao processar {audio_path}: {e}")
return None, None, None
def _predict_segment(self, audio_segment, sr):
"""
Classifica um segmento individual de áudio.
"""
try:
# Pré-processar segmento
inputs = self.feature_extractor(
audio_segment,
sampling_rate=sr,
max_length=int(sr * 5.0),
truncation=True,
padding=True,
return_tensors="pt"
)
# Mover para device
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Fazer predição
with torch.no_grad():
outputs = self.model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
predicted_class_id = predictions.argmax().item()
confidence = predictions.max().item()
predicted_label = self.label_map[predicted_class_id]
return predicted_label, confidence, predicted_class_id
except Exception as e:
return None, None, None
def _combine_predictions(self, predictions_list, confidences_list):
"""
Combina múltiplas predições usando voto majoritário ponderado pela confiança.
"""
# Converter para arrays numpy
predictions = np.array(predictions_list)
confidences = np.array(confidences_list)
# Calcular pontuações ponderadas para cada classe
class_scores = {}
for class_id in [0, 1]: # pt_br=0, pt_pt=1
mask = predictions == class_id
if np.any(mask):
# Somar confiança de todas as predições desta classe
class_scores[class_id] = np.sum(confidences[mask])
else:
class_scores[class_id] = 0.0
# Classe com maior pontuação
predicted_class_id = max(class_scores.keys(), key=lambda k: class_scores[k])
# Confiança final: média das confiânças da classe vencedora
winner_mask = predictions == predicted_class_id
if np.any(winner_mask):
final_confidence = np.mean(confidences[winner_mask])
else:
final_confidence = 0.0
predicted_label = self.label_map[predicted_class_id]
return predicted_label, final_confidence, predicted_class_id
def find_audio_files(folder_path, supported_formats=None):
"""
Encontra todos os arquivos de áudio recursivamente.
"""
if supported_formats is None:
supported_formats = ['.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac']
audio_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if any(file.lower().endswith(ext) for ext in supported_formats):
full_path = os.path.join(root, file)
audio_files.append(full_path)
return audio_files
def process_audio_folder(inference_engine, audio_folder, output_csv, samples_folder=None,
samples_per_class=10, supported_formats=None):
"""
Processa uma pasta de áudios recursivamente usando janela deslizante.
"""
print(f"Processando pasta: {audio_folder}")
print("Usando janela deslizante de 5s com sobreposição de 2.5s para áudios longos")
# Encontrar todos os arquivos de áudio
print("Buscando arquivos de áudio...")
audio_files = find_audio_files(audio_folder, supported_formats)
# # shuffle
# random.shuffle(audio_files)
# audio_files = audio_files[0:100]
if not audio_files:
print(f"✗ Nenhum arquivo de áudio encontrado em {audio_folder}")
print(f"Formatos suportados: {supported_formats}")
return
print(f"✓ Encontrados {len(audio_files)} arquivos de áudio")
# Resultados
results = []
processed_count = 0
error_count = 0
total_segments = 0
print("\nProcessando arquivos...")
for i, audio_file in enumerate(audio_files, 1):
if i % 50 == 0 or i == len(audio_files):
print(f"Progresso: {i}/{len(audio_files)} ({(i/len(audio_files)*100):.1f}%)")
# Classificar áudio
predicted_label, confidence, class_id = inference_engine.predict(audio_file)
if predicted_label is not None:
# Calcular caminho relativo
rel_path = os.path.relpath(audio_file, audio_folder)
# Calcular duração do áudio para estatísticas
try:
import librosa
audio_duration = librosa.get_duration(filename=audio_file)
segments_used = max(1, int((audio_duration - 5.0) / 2.5) + 1) if audio_duration > 5.0 else 1
total_segments += segments_used
except:
audio_duration = None
segments_used = 1
result = {
'arquivo': os.path.basename(audio_file),
'caminho_relativo': rel_path,
'caminho_completo': audio_file,
'label_predita': predicted_label,
'confianca': confidence,
'classe_id': class_id,
'duracao_segundos': audio_duration,
'segmentos_analisados': segments_used,
'timestamp': datetime.now().isoformat()
}
results.append(result)
processed_count += 1
else:
error_count += 1
print(f"\n✓ Processamento concluído!")
print(f"Arquivos processados: {processed_count}")
print(f"Arquivos com erro: {error_count}")
print(f"Total de segmentos analisados: {total_segments}")
if not results:
print("✗ Nenhum arquivo foi processado com sucesso.")
return
# Criar DataFrame
df = pd.DataFrame(results)
# Estatísticas
print(f"\n=== Estatísticas ===")
distribution = Counter(df['label_predita'])
for label, count in distribution.items():
percentage = (count / len(results)) * 100
print(f"{label}: {count} arquivos ({percentage:.1f}%)")
avg_confidence = df['confianca'].mean()
print(f"Confiança média: {avg_confidence:.3f}")
# Arquivos com baixa confiança
low_confidence_threshold = 0.7
low_confidence = df[df['confianca'] < low_confidence_threshold]
print(f"Arquivos com baixa confiança (< {low_confidence_threshold}): {len(low_confidence)} ({len(low_confidence)/len(df)*100:.1f}%)")
# Salvar CSV
df.to_csv(output_csv, index=False)
print(f"\n✓ Resultados salvos em: {output_csv}")
# Salvar amostras para verificação qualitativa
if samples_folder:
save_quality_samples(df, audio_folder, samples_folder, samples_per_class)
return df
def save_quality_samples(df, source_folder, samples_folder, samples_per_class=10):
"""
Salva amostras de cada classe para verificação qualitativa.
"""
print(f"\nSalvando amostras para verificação qualitativa...")
# Criar pasta de amostras
os.makedirs(samples_folder, exist_ok=True)
# Para cada classe
for label in df['label_predita'].unique():
print(f"Salvando amostras da classe: {label}")
# Criar subpasta para a classe
class_folder = os.path.join(samples_folder, label)
os.makedirs(class_folder, exist_ok=True)
# Filtrar arquivos da classe
class_files = df[df['label_predita'] == label]
# Estratégia de amostragem: metade com alta confiança, metade aleatória
high_conf = class_files[class_files['confianca'] >= 0.8].sample(
n=min(samples_per_class//2, len(class_files[class_files['confianca'] >= 0.8])),
random_state=42
) if len(class_files[class_files['confianca'] >= 0.8]) > 0 else pd.DataFrame()
remaining_needed = samples_per_class - len(high_conf)
if remaining_needed > 0:
remaining_files = class_files[~class_files.index.isin(high_conf.index)]
random_sample = remaining_files.sample(
n=min(remaining_needed, len(remaining_files)),
random_state=42
) if len(remaining_files) > 0 else pd.DataFrame()
else:
random_sample = pd.DataFrame()
# Combinar amostras
samples = pd.concat([high_conf, random_sample]).head(samples_per_class)
# Copiar arquivos
sample_info = []
for idx, row in samples.iterrows():
source_path = row['caminho_completo']
filename = row['arquivo']
confidence = row['confianca']
# Nome do arquivo com confiança
name, ext = os.path.splitext(filename)
new_filename = f"{name}_conf{confidence:.3f}{ext}"
dest_path = os.path.join(class_folder, new_filename)
try:
shutil.copy2(source_path, dest_path)
sample_info.append({
'arquivo_original': filename,
'arquivo_copia': new_filename,
'confianca': confidence,
'caminho_original': row['caminho_relativo']
})
except Exception as e:
print(f"Erro ao copiar {source_path}: {e}")
# Salvar informações das amostras
if sample_info:
sample_df = pd.DataFrame(sample_info)
info_file = os.path.join(class_folder, "info_amostras.csv")
sample_df.to_csv(info_file, index=False)
print(f" ✓ {len(sample_info)} amostras salvas em {class_folder}")
else:
print(f" ✗ Nenhuma amostra salva para {label}")
print(f"✓ Amostras salvas em: {samples_folder}")
def main():
parser = argparse.ArgumentParser(
description="Inferência em pasta de áudios sem labels conhecidas"
)
parser.add_argument(
"audio_folder",
help="Pasta raiz contendo arquivos de áudio (busca recursiva)"
)
parser.add_argument(
"--model_path",
default="./nn/results/final_model",
help="Caminho para o modelo treinado"
)
parser.add_argument(
"--output",
default="inferencia_resultados.csv",
help="Arquivo CSV para salvar resultados"
)
parser.add_argument(
"--samples_folder",
default="amostras_verificacao",
help="Pasta para salvar amostras para verificação qualitativa"
)
parser.add_argument(
"--samples_per_class",
type=int,
default=100,
help="Número de amostras por classe para verificação"
)
parser.add_argument(
"--no_samples",
action="store_true",
help="Não salvar amostras para verificação"
)
parser.add_argument(
"--formats",
nargs="+",
default=['.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac'],
help="Formatos de áudio suportados"
)
args = parser.parse_args()
# Validações
if not os.path.exists(args.audio_folder):
print(f"✗ Pasta '{args.audio_folder}' não encontrada!")
sys.exit(1)
if not os.path.exists(args.model_path):
print(f"✗ Modelo '{args.model_path}' não encontrado!")
sys.exit(1)
print("=== Inferência em Pasta de Áudios ===")
print(f"Pasta de áudio: {args.audio_folder}")
print(f"Modelo: {args.model_path}")
print(f"Arquivo de saída: {args.output}")
if not args.no_samples:
print(f"Pasta de amostras: {args.samples_folder}")
print(f"Amostras por classe: {args.samples_per_class}")
print(f"Formatos suportados: {args.formats}")
print()
# Inicializar inferência
inference_engine = AudioInference(args.model_path)
# Processar pasta
samples_folder = None if args.no_samples else args.samples_folder
df = process_audio_folder(
inference_engine=inference_engine,
audio_folder=args.audio_folder,
output_csv=args.output,
samples_folder=samples_folder,
samples_per_class=args.samples_per_class,
supported_formats=args.formats
)
if df is not None:
print("\n=== Resumo Final ===")
print(f"Total de arquivos processados: {len(df)}")
print(f"Resultados salvos em: {args.output}")
if not args.no_samples:
print(f"Amostras para verificação em: {args.samples_folder}")
print("\nClassificação concluída com sucesso! 🎉")
if __name__ == "__main__":
# Mostrar ajuda se nenhum argumento
if len(sys.argv) == 1:
print("=== Script de Inferência de Áudios ===")
print()
print("Este script classifica áudios em uma pasta recursivamente.")
print("Não precisa de labels conhecidas - classifica tudo automaticamente.")
print()
print("Uso básico:")
print(" python infer_audio_folder.py <pasta_de_audios>")
print()
print("Exemplos:")
print(" python infer_audio_folder.py ../audios_novos")
print(" python infer_audio_folder.py ../dataset --output resultados.csv")
print(" python infer_audio_folder.py ../audios --samples_per_class 20")
print(" python infer_audio_folder.py ../audios --no_samples")
print()
print("O que o script faz:")
print("1. Busca recursivamente todos os arquivos de áudio")
print("2. Classifica cada um como pt_br ou pt_pt")
print("3. Salva resultados detalhados em CSV")
print("4. Cria amostras para verificação manual")
print("5. Mostra estatísticas dos resultados")
print()
print("Para ver todas as opções: python infer_audio_folder.py --help")
sys.exit(0)
main()