MedLLM-Agent / utils.py
LiamKhoaLe's picture
Debug MCP #2
ffcfd50
raw
history blame
3.52 kB
"""Utility functions for translation, language detection, and formatting"""
import asyncio
from langdetect import detect, LangDetectException
from logger import logger
from client import MCP_AVAILABLE, call_agent
from config import GEMINI_MODEL_LITE
try:
import nest_asyncio
except ImportError:
nest_asyncio = None
def format_prompt_manually(messages: list, tokenizer) -> str:
"""Manually format prompt for models without chat template"""
system_content = ""
user_content = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
system_content = content
elif role == "user":
user_content = content
if system_content:
prompt = f"{system_content}\n\nQuestion: {user_content}\n\nAnswer:"
else:
prompt = f"Question: {user_content}\n\nAnswer:"
return prompt
def detect_language(text: str) -> str:
"""Detect language of input text"""
try:
lang = detect(text)
return lang
except LangDetectException:
return "en"
def format_url_as_domain(url: str) -> str:
"""Format URL as simple domain name (e.g., www.mayoclinic.org)"""
if not url:
return ""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain.startswith('www.'):
return domain
elif domain:
return domain
return url
except Exception:
if '://' in url:
domain = url.split('://')[1].split('/')[0]
return domain
return url
async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str:
"""Translate text using Gemini MCP"""
if source_lang:
user_prompt = f"Translate the following {source_lang} text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
else:
user_prompt = f"Translate the following text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
system_prompt = "You are a professional translator. Translate accurately and concisely."
result = await call_agent(
user_prompt=user_prompt,
system_prompt=system_prompt,
model=GEMINI_MODEL_LITE,
temperature=0.2
)
return result.strip()
def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str:
"""Translate text using Gemini MCP"""
if not MCP_AVAILABLE:
logger.warning("Gemini MCP not available for translation")
return text
try:
loop = asyncio.get_event_loop()
if loop.is_running():
if nest_asyncio:
translated = nest_asyncio.run(translate_text_gemini(text, target_lang, source_lang))
if translated:
logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
return translated
else:
logger.error("Error in nested async translation: nest_asyncio not available")
else:
translated = loop.run_until_complete(translate_text_gemini(text, target_lang, source_lang))
if translated:
logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
return translated
except Exception as e:
logger.error(f"Gemini MCP translation error: {e}")
return text