MedLLM-Agent / utils.py
Y Phung Nguyen
Run Gemini in thread to avoid timeout
1fc52ea
raw
history blame
5.24 kB
"""Utility functions for translation, language detection, and formatting"""
import asyncio
import re
from langdetect import detect_langs, LangDetectException
from logger import logger
from client import MCP_AVAILABLE, call_agent
from config import GEMINI_MODEL_LITE
try:
import nest_asyncio
except ImportError:
nest_asyncio = None
def format_prompt_manually(messages: list, tokenizer) -> str:
"""Manually format prompt for models without chat template"""
system_content = ""
user_content = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
system_content = content
elif role == "user":
user_content = content
if system_content:
prompt = f"{system_content}\n\nQuestion: {user_content}\n\nAnswer:"
else:
prompt = f"Question: {user_content}\n\nAnswer:"
return prompt
MIN_TEXT_LENGTH_FOR_DETECTION = 12
LANG_CONFIDENCE_THRESHOLD = 0.8
ASCII_DOMINANCE_THRESHOLD = 0.97
ENGLISH_HINT_RATIO = 0.2
ENGLISH_HINT_WORDS = {
"the", "and", "with", "for", "you", "your", "have", "has", "that", "this",
"pain", "blood", "pressure", "please", "what", "how", "can", "should", "need"
}
def _ascii_ratio(text: str) -> float:
if not text:
return 1.0
ascii_chars = sum(1 for ch in text if ord(ch) < 128)
return ascii_chars / max(len(text), 1)
def _looks_english(text: str) -> bool:
words = re.findall(r"[A-Za-z']+", text.lower())
if not words:
return False
english_hits = sum(1 for word in words if word in ENGLISH_HINT_WORDS)
return english_hits / len(words) >= ENGLISH_HINT_RATIO
def detect_language(text: str) -> str:
"""Detect language of input text with basic confidence heuristics"""
if not text:
return "en"
sample = text.strip()
if not sample:
return "en"
ascii_ratio = _ascii_ratio(sample)
has_non_ascii = ascii_ratio < 1.0
if len(sample) < MIN_TEXT_LENGTH_FOR_DETECTION and not has_non_ascii:
return "en"
try:
detections = detect_langs(sample)
except LangDetectException:
return "en"
except Exception as exc:
logger.debug(f"[LANG-DETECT] Unexpected error, defaulting to English: {exc}")
return "en"
if not detections:
return "en"
top = detections[0]
lang_code = top.lang
confidence = getattr(top, "prob", 0.0)
if confidence < LANG_CONFIDENCE_THRESHOLD:
return "en"
if lang_code == "en":
return "en"
if not has_non_ascii and ascii_ratio >= ASCII_DOMINANCE_THRESHOLD and _looks_english(sample):
logger.info(f"[LANG-DETECT] Overrode {lang_code} due to English heuristics (ascii_ratio={ascii_ratio:.2f})")
return "en"
return lang_code
def format_url_as_domain(url: str) -> str:
"""Format URL as simple domain name (e.g., www.mayoclinic.org)"""
if not url:
return ""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain.startswith('www.'):
return domain
elif domain:
return domain
return url
except Exception:
if '://' in url:
domain = url.split('://')[1].split('/')[0]
return domain
return url
async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str:
"""Translate text using Gemini MCP"""
if source_lang:
user_prompt = f"Translate the following {source_lang} text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
else:
user_prompt = f"Translate the following text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
system_prompt = "You are a professional translator. Translate accurately and concisely."
result = await call_agent(
user_prompt=user_prompt,
system_prompt=system_prompt,
model=GEMINI_MODEL_LITE,
temperature=0.2
)
return result.strip()
def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str:
"""Translate text using Gemini MCP"""
if not MCP_AVAILABLE:
logger.warning("Gemini MCP not available for translation")
return text
try:
loop = asyncio.get_event_loop()
if loop.is_running():
if nest_asyncio:
translated = nest_asyncio.run(translate_text_gemini(text, target_lang, source_lang))
if translated:
logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
return translated
else:
logger.error("Error in nested async translation: nest_asyncio not available")
else:
translated = loop.run_until_complete(translate_text_gemini(text, target_lang, source_lang))
if translated:
logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
return translated
except Exception as e:
logger.error(f"Gemini MCP translation error: {e}")
return text