"""Web search functions""" import json import asyncio import concurrent.futures from logger import logger from client import MCP_AVAILABLE, get_mcp_session, get_cached_mcp_tools, call_agent from config import GEMINI_MODEL try: import nest_asyncio except ImportError: nest_asyncio = None from supervisor import MAX_SEARCH_STRATEGIES async def search_web_mcp_tool(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: """Search web using MCP web search tool (e.g., DuckDuckGo MCP server)""" if not MCP_AVAILABLE: return [] try: tools = await get_cached_mcp_tools() if not tools: return [] search_tool = None for tool in tools: tool_name_lower = tool.name.lower() if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]): search_tool = tool logger.info(f"Found web search MCP tool: {tool.name}") break if not search_tool: tools = await get_cached_mcp_tools(force_refresh=True) for tool in tools: tool_name_lower = tool.name.lower() if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]): search_tool = tool logger.info(f"Found web search MCP tool after refresh: {tool.name}") break if search_tool: try: session = await get_mcp_session() if session is None: return [] result = await session.call_tool( search_tool.name, arguments={"query": query, "max_results": max_results} ) web_content = [] if hasattr(result, 'content') and result.content: for item in result.content: if hasattr(item, 'text'): try: data = json.loads(item.text) if isinstance(data, list): for entry in data[:max_results]: web_content.append({ 'title': entry.get('title', ''), 'url': entry.get('url', entry.get('href', '')), 'content': entry.get('body', entry.get('snippet', entry.get('content', ''))) }) elif isinstance(data, dict): if 'results' in data: for entry in data['results'][:max_results]: web_content.append({ 'title': entry.get('title', ''), 'url': entry.get('url', entry.get('href', '')), 'content': entry.get('body', entry.get('snippet', entry.get('content', ''))) }) else: web_content.append({ 'title': data.get('title', ''), 'url': data.get('url', data.get('href', '')), 'content': data.get('body', data.get('snippet', data.get('content', ''))) }) except json.JSONDecodeError: web_content.append({ 'title': '', 'url': '', 'content': item.text[:1000] }) if web_content: return web_content except Exception as e: logger.error(f"Error calling web search MCP tool: {e}") else: logger.debug("No MCP web search tool discovered in current catalog") return [] except Exception as e: logger.error(f"Web search MCP tool error: {e}") return [] async def search_web_mcp(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: """Search web using MCP tools - tries web search MCP tool first, then falls back to direct search""" results = await search_web_mcp_tool(query, max_results) if results: logger.info(f"✅ Web search via MCP tool: found {len(results)} results") return results logger.info("â„šī¸ [Direct API] No web search MCP tool found, using direct DuckDuckGo search (results will be summarized with Gemini MCP)") return search_web_fallback(query, max_results) def search_web_fallback(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: """Fallback web search using DuckDuckGo directly (when MCP is not available)""" logger.info(f"🔍 [Direct API] Performing web search using DuckDuckGo API for: {query[:100]}...") try: from ddgs import DDGS import requests from bs4 import BeautifulSoup except ImportError: logger.error("Fallback dependencies (ddgs, requests, beautifulsoup4) not available") return [] try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=max_results)) web_content = [] for result in results: try: url = result.get('href', '') title = result.get('title', '') snippet = result.get('body', '') try: response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'}) if response.status_code == 200: soup = BeautifulSoup(response.content, 'html.parser') for script in soup(["script", "style"]): script.decompose() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) if len(text) > 1000: text = text[:1000] + "..." web_content.append({ 'title': title, 'url': url, 'content': snippet + "\n" + text[:500] if text else snippet }) else: web_content.append({ 'title': title, 'url': url, 'content': snippet }) except: web_content.append({ 'title': title, 'url': url, 'content': snippet }) except Exception as e: logger.error(f"Error processing search result: {e}") continue logger.info(f"✅ [Direct API] Web search completed: {len(web_content)} results") return web_content except Exception as e: logger.error(f"❌ [Direct API] Web search error: {e}") return [] def search_web(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: """Search web using MCP tools (synchronous wrapper) - prioritizes MCP over direct ddgs""" if MCP_AVAILABLE: try: try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if loop.is_running(): if nest_asyncio: results = nest_asyncio.run(search_web_mcp(query, max_results)) if results: return results else: with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, search_web_mcp(query, max_results)) results = future.result(timeout=30) if results: return results else: results = loop.run_until_complete(search_web_mcp(query, max_results)) if results: return results except Exception as e: logger.error(f"Error running async MCP search: {e}") logger.info("â„šī¸ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)") return search_web_fallback(query, max_results) async def summarize_web_content_gemini(content_list: list, query: str) -> str: """Summarize web search results using Gemini MCP""" combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]]) user_prompt = f"""Summarize the following web search results related to the query: "{query}" Extract key medical information, facts, and insights. Be concise and focus on reliable information. Search Results: {combined_content} Summary:""" system_prompt = "You are a medical information summarizer. Extract and summarize key medical facts accurately." result = await call_agent( user_prompt=user_prompt, system_prompt=system_prompt, model=GEMINI_MODEL, temperature=0.5 ) return result.strip() def summarize_web_content(content_list: list, query: str) -> str: """Summarize web search results using Gemini MCP""" if not MCP_AVAILABLE: logger.warning("Gemini MCP not available for summarization") if content_list: return content_list[0].get('content', '')[:500] return "" try: loop = asyncio.get_event_loop() if loop.is_running(): if nest_asyncio: summary = nest_asyncio.run(summarize_web_content_gemini(content_list, query)) if summary: return summary else: logger.error("Error in nested async summarization: nest_asyncio not available") else: summary = loop.run_until_complete(summarize_web_content_gemini(content_list, query)) if summary: return summary except Exception as e: logger.error(f"Gemini MCP summarization error: {e}") if content_list: return content_list[0].get('content', '')[:500] return ""