File size: 11,085 Bytes
52b4ed7
 
 
 
 
ffcfd50
52b4ed7
 
 
 
 
 
 
e6bba1f
52b4ed7
e6bba1f
52b4ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e6bba1f
52b4ed7
 
 
 
 
 
 
 
 
 
e6bba1f
52b4ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1fc52ea
52b4ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
"""Web search functions"""
import json
import asyncio
import concurrent.futures
from logger import logger
from client import MCP_AVAILABLE, get_mcp_session, get_cached_mcp_tools, call_agent
from config import GEMINI_MODEL

try:
    import nest_asyncio
except ImportError:
    nest_asyncio = None

from supervisor import MAX_SEARCH_STRATEGIES

async def search_web_mcp_tool(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
    """Search web using MCP web search tool (e.g., DuckDuckGo MCP server)"""
    if not MCP_AVAILABLE:
        return []
    
    try:
        tools = await get_cached_mcp_tools()
        if not tools:
            return []
        
        search_tool = None
        for tool in tools:
            tool_name_lower = tool.name.lower()
            if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]):
                search_tool = tool
                logger.info(f"Found web search MCP tool: {tool.name}")
                break
        
        if not search_tool:
            tools = await get_cached_mcp_tools(force_refresh=True)
            for tool in tools:
                tool_name_lower = tool.name.lower()
                if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]):
                    search_tool = tool
                    logger.info(f"Found web search MCP tool after refresh: {tool.name}")
                    break
        
        if search_tool:
            try:
                session = await get_mcp_session()
                if session is None:
                    return []
                
                result = await session.call_tool(
                    search_tool.name,
                    arguments={"query": query, "max_results": max_results}
                )
            
                web_content = []
                if hasattr(result, 'content') and result.content:
                    for item in result.content:
                        if hasattr(item, 'text'):
                            try:
                                data = json.loads(item.text)
                                if isinstance(data, list):
                                    for entry in data[:max_results]:
                                        web_content.append({
                                            'title': entry.get('title', ''),
                                            'url': entry.get('url', entry.get('href', '')),
                                            'content': entry.get('body', entry.get('snippet', entry.get('content', '')))
                                        })
                                elif isinstance(data, dict):
                                    if 'results' in data:
                                        for entry in data['results'][:max_results]:
                                            web_content.append({
                                                'title': entry.get('title', ''),
                                                'url': entry.get('url', entry.get('href', '')),
                                                'content': entry.get('body', entry.get('snippet', entry.get('content', '')))
                                            })
                                    else:
                                        web_content.append({
                                            'title': data.get('title', ''),
                                            'url': data.get('url', data.get('href', '')),
                                            'content': data.get('body', data.get('snippet', data.get('content', '')))
                                        })
                            except json.JSONDecodeError:
                                web_content.append({
                                    'title': '',
                                    'url': '',
                                    'content': item.text[:1000]
                                })
                
                if web_content:
                    return web_content
            except Exception as e:
                logger.error(f"Error calling web search MCP tool: {e}")
        
        else:
            logger.debug("No MCP web search tool discovered in current catalog")
            return []
    except Exception as e:
        logger.error(f"Web search MCP tool error: {e}")
        return []

async def search_web_mcp(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
    """Search web using MCP tools - tries web search MCP tool first, then falls back to direct search"""
    results = await search_web_mcp_tool(query, max_results)
    if results:
        logger.info(f"✅ Web search via MCP tool: found {len(results)} results")
        return results
    
    logger.info("ℹ️ [Direct API] No web search MCP tool found, using direct DuckDuckGo search (results will be summarized with Gemini MCP)")
    return search_web_fallback(query, max_results)


def search_web_fallback(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
    """Fallback web search using DuckDuckGo directly (when MCP is not available)"""
    logger.info(f"🔍 [Direct API] Performing web search using DuckDuckGo API for: {query[:100]}...")
    try:
        from ddgs import DDGS
        import requests
        from bs4 import BeautifulSoup
    except ImportError:
        logger.error("Fallback dependencies (ddgs, requests, beautifulsoup4) not available")
        return []
    
    try:
        with DDGS() as ddgs:
            results = list(ddgs.text(query, max_results=max_results))
            web_content = []
            for result in results:
                try:
                    url = result.get('href', '')
                    title = result.get('title', '')
                    snippet = result.get('body', '')
                    
                    try:
                        response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'})
                        if response.status_code == 200:
                            soup = BeautifulSoup(response.content, 'html.parser')
                            for script in soup(["script", "style"]):
                                script.decompose()
                            text = soup.get_text()
                            lines = (line.strip() for line in text.splitlines())
                            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
                            text = ' '.join(chunk for chunk in chunks if chunk)
                            if len(text) > 1000:
                                text = text[:1000] + "..."
                            web_content.append({
                                'title': title,
                                'url': url,
                                'content': snippet + "\n" + text[:500] if text else snippet
                            })
                        else:
                            web_content.append({
                                'title': title,
                                'url': url,
                                'content': snippet
                            })
                    except:
                        web_content.append({
                            'title': title,
                            'url': url,
                            'content': snippet
                        })
                except Exception as e:
                    logger.error(f"Error processing search result: {e}")
                    continue
            logger.info(f"✅ [Direct API] Web search completed: {len(web_content)} results")
            return web_content
    except Exception as e:
        logger.error(f"❌ [Direct API] Web search error: {e}")
        return []

def search_web(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
    """Search web using MCP tools (synchronous wrapper) - prioritizes MCP over direct ddgs"""
    if MCP_AVAILABLE:
        try:
            try:
                loop = asyncio.get_event_loop()
            except RuntimeError:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
            
            if loop.is_running():
                if nest_asyncio:
                    results = nest_asyncio.run(search_web_mcp(query, max_results))
                    if results:
                        return results
                else:
                    with concurrent.futures.ThreadPoolExecutor() as executor:
                        future = executor.submit(asyncio.run, search_web_mcp(query, max_results))
                        results = future.result(timeout=30)
                        if results:
                            return results
            else:
                results = loop.run_until_complete(search_web_mcp(query, max_results))
                if results:
                    return results
        except Exception as e:
            logger.error(f"Error running async MCP search: {e}")
    
    logger.info("ℹ️ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)")
    return search_web_fallback(query, max_results)

async def summarize_web_content_gemini(content_list: list, query: str) -> str:
    """Summarize web search results using Gemini MCP"""
    combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
    
    user_prompt = f"""Summarize the following web search results related to the query: "{query}"
Extract key medical information, facts, and insights. Be concise and focus on reliable information.
Search Results:
{combined_content}
Summary:"""
    
    system_prompt = "You are a medical information summarizer. Extract and summarize key medical facts accurately."
    
    result = await call_agent(
        user_prompt=user_prompt,
        system_prompt=system_prompt,
        model=GEMINI_MODEL,
        temperature=0.5
    )
    
    return result.strip()

def summarize_web_content(content_list: list, query: str) -> str:
    """Summarize web search results using Gemini MCP"""
    if not MCP_AVAILABLE:
        logger.warning("Gemini MCP not available for summarization")
        if content_list:
            return content_list[0].get('content', '')[:500]
        return ""
    
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            if nest_asyncio:
                summary = nest_asyncio.run(summarize_web_content_gemini(content_list, query))
                if summary:
                    return summary
            else:
                logger.error("Error in nested async summarization: nest_asyncio not available")
        else:
            summary = loop.run_until_complete(summarize_web_content_gemini(content_list, query))
            if summary:
                return summary
    except Exception as e:
        logger.error(f"Gemini MCP summarization error: {e}")
    
    if content_list:
        return content_list[0].get('content', '')[:500]
    return ""