File size: 5,239 Bytes
52b4ed7
 
ef322a1
 
52b4ed7
ffcfd50
52b4ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef322a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52b4ed7
ef322a1
 
 
 
 
 
 
 
 
 
 
 
52b4ed7
ef322a1
52b4ed7
 
ef322a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52b4ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Utility functions for translation, language detection, and formatting"""
import asyncio
import re
from langdetect import detect_langs, LangDetectException
from logger import logger
from client import MCP_AVAILABLE, call_agent
from config import GEMINI_MODEL_LITE

try:
    import nest_asyncio
except ImportError:
    nest_asyncio = None


def format_prompt_manually(messages: list, tokenizer) -> str:
    """Manually format prompt for models without chat template"""
    system_content = ""
    user_content = ""
    
    for msg in messages:
        role = msg.get("role", "user")
        content = msg.get("content", "")
        
        if role == "system":
            system_content = content
        elif role == "user":
            user_content = content
    
    if system_content:
        prompt = f"{system_content}\n\nQuestion: {user_content}\n\nAnswer:"
    else:
        prompt = f"Question: {user_content}\n\nAnswer:"
    
    return prompt


MIN_TEXT_LENGTH_FOR_DETECTION = 12
LANG_CONFIDENCE_THRESHOLD = 0.8
ASCII_DOMINANCE_THRESHOLD = 0.97
ENGLISH_HINT_RATIO = 0.2
ENGLISH_HINT_WORDS = {
    "the", "and", "with", "for", "you", "your", "have", "has", "that", "this",
    "pain", "blood", "pressure", "please", "what", "how", "can", "should", "need"
}


def _ascii_ratio(text: str) -> float:
    if not text:
        return 1.0
    ascii_chars = sum(1 for ch in text if ord(ch) < 128)
    return ascii_chars / max(len(text), 1)


def _looks_english(text: str) -> bool:
    words = re.findall(r"[A-Za-z']+", text.lower())
    if not words:
        return False
    english_hits = sum(1 for word in words if word in ENGLISH_HINT_WORDS)
    return english_hits / len(words) >= ENGLISH_HINT_RATIO


def detect_language(text: str) -> str:
    """Detect language of input text with basic confidence heuristics"""
    if not text:
        return "en"
    sample = text.strip()
    if not sample:
        return "en"

    ascii_ratio = _ascii_ratio(sample)
    has_non_ascii = ascii_ratio < 1.0
    if len(sample) < MIN_TEXT_LENGTH_FOR_DETECTION and not has_non_ascii:
        return "en"

    try:
        detections = detect_langs(sample)
    except LangDetectException:
        return "en"
    except Exception as exc:
        logger.debug(f"[LANG-DETECT] Unexpected error, defaulting to English: {exc}")
        return "en"

    if not detections:
        return "en"

    top = detections[0]
    lang_code = top.lang
    confidence = getattr(top, "prob", 0.0)

    if confidence < LANG_CONFIDENCE_THRESHOLD:
        return "en"

    if lang_code == "en":
        return "en"

    if not has_non_ascii and ascii_ratio >= ASCII_DOMINANCE_THRESHOLD and _looks_english(sample):
        logger.info(f"[LANG-DETECT] Overrode {lang_code} due to English heuristics (ascii_ratio={ascii_ratio:.2f})")
        return "en"

    return lang_code


def format_url_as_domain(url: str) -> str:
    """Format URL as simple domain name (e.g., www.mayoclinic.org)"""
    if not url:
        return ""
    try:
        from urllib.parse import urlparse
        parsed = urlparse(url)
        domain = parsed.netloc or parsed.path
        if domain.startswith('www.'):
            return domain
        elif domain:
            return domain
        return url
    except Exception:
        if '://' in url:
            domain = url.split('://')[1].split('/')[0]
            return domain
        return url


async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str:
    """Translate text using Gemini MCP"""
    if source_lang:
        user_prompt = f"Translate the following {source_lang} text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
    else:
        user_prompt = f"Translate the following text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
    
    system_prompt = "You are a professional translator. Translate accurately and concisely."
    
    result = await call_agent(
        user_prompt=user_prompt,
        system_prompt=system_prompt,
        model=GEMINI_MODEL_LITE,
        temperature=0.2
    )
    
    return result.strip()


def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str:
    """Translate text using Gemini MCP"""
    if not MCP_AVAILABLE:
        logger.warning("Gemini MCP not available for translation")
        return text
    
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            if nest_asyncio:
                translated = nest_asyncio.run(translate_text_gemini(text, target_lang, source_lang))
                if translated:
                    logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
                    return translated
            else:
                logger.error("Error in nested async translation: nest_asyncio not available")
        else:
            translated = loop.run_until_complete(translate_text_gemini(text, target_lang, source_lang))
            if translated:
                logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
                return translated
    except Exception as e:
        logger.error(f"Gemini MCP translation error: {e}")
    
    return text