"""Utility functions for translation, language detection, and formatting""" import asyncio import re from langdetect import detect_langs, LangDetectException from logger import logger from client import MCP_AVAILABLE, call_agent from config import GEMINI_MODEL_LITE try: import nest_asyncio except ImportError: nest_asyncio = None def format_prompt_manually(messages: list, tokenizer) -> str: """Manually format prompt for models without chat template""" system_content = "" user_content = "" for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": system_content = content elif role == "user": user_content = content if system_content: prompt = f"{system_content}\n\nQuestion: {user_content}\n\nAnswer:" else: prompt = f"Question: {user_content}\n\nAnswer:" return prompt MIN_TEXT_LENGTH_FOR_DETECTION = 12 LANG_CONFIDENCE_THRESHOLD = 0.8 ASCII_DOMINANCE_THRESHOLD = 0.97 ENGLISH_HINT_RATIO = 0.2 ENGLISH_HINT_WORDS = { "the", "and", "with", "for", "you", "your", "have", "has", "that", "this", "pain", "blood", "pressure", "please", "what", "how", "can", "should", "need" } def _ascii_ratio(text: str) -> float: if not text: return 1.0 ascii_chars = sum(1 for ch in text if ord(ch) < 128) return ascii_chars / max(len(text), 1) def _looks_english(text: str) -> bool: words = re.findall(r"[A-Za-z']+", text.lower()) if not words: return False english_hits = sum(1 for word in words if word in ENGLISH_HINT_WORDS) return english_hits / len(words) >= ENGLISH_HINT_RATIO def detect_language(text: str) -> str: """Detect language of input text with basic confidence heuristics""" if not text: return "en" sample = text.strip() if not sample: return "en" ascii_ratio = _ascii_ratio(sample) has_non_ascii = ascii_ratio < 1.0 if len(sample) < MIN_TEXT_LENGTH_FOR_DETECTION and not has_non_ascii: return "en" try: detections = detect_langs(sample) except LangDetectException: return "en" except Exception as exc: logger.debug(f"[LANG-DETECT] Unexpected error, defaulting to English: {exc}") return "en" if not detections: return "en" top = detections[0] lang_code = top.lang confidence = getattr(top, "prob", 0.0) if confidence < LANG_CONFIDENCE_THRESHOLD: return "en" if lang_code == "en": return "en" if not has_non_ascii and ascii_ratio >= ASCII_DOMINANCE_THRESHOLD and _looks_english(sample): logger.info(f"[LANG-DETECT] Overrode {lang_code} due to English heuristics (ascii_ratio={ascii_ratio:.2f})") return "en" return lang_code def format_url_as_domain(url: str) -> str: """Format URL as simple domain name (e.g., www.mayoclinic.org)""" if not url: return "" try: from urllib.parse import urlparse parsed = urlparse(url) domain = parsed.netloc or parsed.path if domain.startswith('www.'): return domain elif domain: return domain return url except Exception: if '://' in url: domain = url.split('://')[1].split('/')[0] return domain return url async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str: """Translate text using Gemini MCP""" if source_lang: user_prompt = f"Translate the following {source_lang} text to {target_lang}. Only provide the translation, no explanations:\n\n{text}" else: user_prompt = f"Translate the following text to {target_lang}. Only provide the translation, no explanations:\n\n{text}" system_prompt = "You are a professional translator. Translate accurately and concisely." result = await call_agent( user_prompt=user_prompt, system_prompt=system_prompt, model=GEMINI_MODEL_LITE, temperature=0.2 ) return result.strip() def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str: """Translate text using Gemini MCP""" if not MCP_AVAILABLE: logger.warning("Gemini MCP not available for translation") return text try: loop = asyncio.get_event_loop() if loop.is_running(): if nest_asyncio: translated = nest_asyncio.run(translate_text_gemini(text, target_lang, source_lang)) if translated: logger.info(f"Translated via Gemini MCP: {translated[:50]}...") return translated else: logger.error("Error in nested async translation: nest_asyncio not available") else: translated = loop.run_until_complete(translate_text_gemini(text, target_lang, source_lang)) if translated: logger.info(f"Translated via Gemini MCP: {translated[:50]}...") return translated except Exception as e: logger.error(f"Gemini MCP translation error: {e}") return text