#!/usr/bin/env python3 """ SFTP Model Downloader Agent Handles downloading model files from SFTP server """ import os import re import glob import pysftp from typing import List, Dict from langchain.tools import tool from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.agents import AgentExecutor, create_openai_tools_agent @tool def scan_transcription_files(transcriptions_dir: str = "transcriptions") -> List[Dict[str, str]]: """Scan the transcriptions directory and extract model identifiers from filenames.""" if not os.path.exists(transcriptions_dir): raise FileNotFoundError( f"Transcriptions directory not found: {transcriptions_dir}") transcription_files = glob.glob(os.path.join(transcriptions_dir, "*.json")) model_identifiers = [] for file_path in transcription_files: filename = os.path.basename(file_path) # Extract model identifier from filename pattern: transcriptions_default.99.019111585.rtf_... match = re.search(r'transcriptions_(.+)\.rtf_', filename) if match: model_id = match.group(1) model_identifiers.append({ 'model_id': model_id, 'filename': filename, 'file_path': file_path, # Keep .rtf for SFTP download 'model_filename': f"{model_id}.rtf", # Use .doc for local storage 'local_filename': f"{model_id}.doc" }) return model_identifiers @tool def download_model_from_sftp(model_filename: str, local_download_dir: str = "models", force_download: bool = False) -> str: """Download a specific model file from SFTP server and convert extension from .rtf to .doc. If force_download is True, always re-download.""" # Import configuration try: from sftp_config import get_sftp_config sftp_config = get_sftp_config() except ImportError: # Fallback to environment variables if config file not available sftp_config = { 'host': os.getenv('SFTP_HOST', 'localhost'), 'port': int(os.getenv('SFTP_PORT', '22')), 'username': os.getenv('SFTP_USERNAME', 'user'), 'password': os.getenv('SFTP_PASSWORD', 'password'), 'remote_path': os.getenv('SFTP_REMOTE_PATH', '/models/') } # Create local directory if it doesn't exist os.makedirs(local_download_dir, exist_ok=True) # Convert filename from .rtf to .doc doc_filename = model_filename.replace('.rtf', '.doc') local_file_path = os.path.join(local_download_dir, doc_filename) # If force_download is False and file exists, skip download if not force_download and os.path.exists(local_file_path): print(f"â„šī¸ Model already exists locally: {local_file_path}") return local_file_path try: # Connect to SFTP server cnopts = pysftp.CnOpts() cnopts.hostkeys = None # Disable host key checking for development print( f"🔌 Connecting to SFTP server: {sftp_config['host']}:{sftp_config['port']}") with pysftp.Connection( host=sftp_config['host'], port=sftp_config['port'], username=sftp_config['username'], password=sftp_config['password'], cnopts=cnopts ) as sftp: remote_file_path = os.path.join( sftp_config['remote_path'], model_filename) # Check if file exists on server if not sftp.exists(remote_file_path): raise FileNotFoundError( f"Model file not found on SFTP server: {remote_file_path}") # Get file size for progress tracking file_size = sftp.stat(remote_file_path).st_size print( f"📁 Found file on server: {remote_file_path} ({file_size} bytes)") # Download the file with original .rtf extension first temp_rtf_path = os.path.join(local_download_dir, model_filename) sftp.get(remote_file_path, temp_rtf_path) print(f"đŸ“Ĩ Downloaded model: {model_filename}") # Rename file from .rtf to .doc if os.path.exists(local_file_path): os.remove(local_file_path) os.rename(temp_rtf_path, local_file_path) print(f"✅ Converted extension: {model_filename} -> {doc_filename}") return local_file_path except pysftp.AuthenticationException: error_msg = f"Authentication failed for SFTP server {sftp_config['host']}" print(f"❌ {error_msg}") raise Exception(error_msg) except pysftp.ConnectionException as e: error_msg = f"Connection failed to SFTP server {sftp_config['host']}: {str(e)}" print(f"❌ {error_msg}") raise Exception(error_msg) except FileNotFoundError as e: error_msg = str(e) print(f"❌ {error_msg}") raise except Exception as e: error_msg = f"Error downloading model {model_filename}: {str(e)}" print(f"❌ {error_msg}") raise Exception(error_msg) @tool def batch_download_models(model_identifiers: List[Dict[str, str]], local_download_dir: str = "models") -> List[str]: """Download multiple model files from SFTP server in batch.""" downloaded_files = [] for model_info in model_identifiers: model_filename = model_info['model_filename'] # .rtf file for SFTP local_filename = model_info.get('local_filename', model_filename.replace( '.rtf', '.doc')) # .doc file for local try: local_path = download_model_from_sftp( model_filename, local_download_dir) downloaded_files.append({ 'model_id': model_info['model_id'], 'local_path': local_path, 'local_filename': local_filename, 'status': 'success' }) except Exception as e: downloaded_files.append({ 'model_id': model_info['model_id'], 'local_path': None, 'local_filename': local_filename, 'status': 'error', 'error': str(e) }) return downloaded_files def create_sftp_downloader_agent(llm): """Create the SFTP downloader agent.""" sftp_downloader_prompt = ChatPromptTemplate.from_messages([ ("system", """You are an SFTP model downloader agent. Your task is to: 1. Scan the transcriptions directory to identify which models are needed 2. Download the corresponding model files from the SFTP server 3. Return the list of successfully downloaded models You should handle errors gracefully and provide detailed feedback about the download process."""), ("human", "Analyze the transcriptions in {transcriptions_dir} and download the corresponding models from SFTP."), MessagesPlaceholder("agent_scratchpad") ]) sftp_downloader_agent = create_openai_tools_agent( llm=llm, tools=[scan_transcription_files, download_model_from_sftp, batch_download_models], prompt=sftp_downloader_prompt ) sftp_downloader_executor = AgentExecutor( agent=sftp_downloader_agent, tools=[scan_transcription_files, download_model_from_sftp, batch_download_models], verbose=True ) return sftp_downloader_executor