Spaces:
Running
on
Zero
Running
on
Zero
File size: 11,085 Bytes
52b4ed7 ffcfd50 52b4ed7 e6bba1f 52b4ed7 e6bba1f 52b4ed7 e6bba1f 52b4ed7 e6bba1f 52b4ed7 1fc52ea 52b4ed7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
"""Web search functions"""
import json
import asyncio
import concurrent.futures
from logger import logger
from client import MCP_AVAILABLE, get_mcp_session, get_cached_mcp_tools, call_agent
from config import GEMINI_MODEL
try:
import nest_asyncio
except ImportError:
nest_asyncio = None
from supervisor import MAX_SEARCH_STRATEGIES
async def search_web_mcp_tool(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
"""Search web using MCP web search tool (e.g., DuckDuckGo MCP server)"""
if not MCP_AVAILABLE:
return []
try:
tools = await get_cached_mcp_tools()
if not tools:
return []
search_tool = None
for tool in tools:
tool_name_lower = tool.name.lower()
if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]):
search_tool = tool
logger.info(f"Found web search MCP tool: {tool.name}")
break
if not search_tool:
tools = await get_cached_mcp_tools(force_refresh=True)
for tool in tools:
tool_name_lower = tool.name.lower()
if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]):
search_tool = tool
logger.info(f"Found web search MCP tool after refresh: {tool.name}")
break
if search_tool:
try:
session = await get_mcp_session()
if session is None:
return []
result = await session.call_tool(
search_tool.name,
arguments={"query": query, "max_results": max_results}
)
web_content = []
if hasattr(result, 'content') and result.content:
for item in result.content:
if hasattr(item, 'text'):
try:
data = json.loads(item.text)
if isinstance(data, list):
for entry in data[:max_results]:
web_content.append({
'title': entry.get('title', ''),
'url': entry.get('url', entry.get('href', '')),
'content': entry.get('body', entry.get('snippet', entry.get('content', '')))
})
elif isinstance(data, dict):
if 'results' in data:
for entry in data['results'][:max_results]:
web_content.append({
'title': entry.get('title', ''),
'url': entry.get('url', entry.get('href', '')),
'content': entry.get('body', entry.get('snippet', entry.get('content', '')))
})
else:
web_content.append({
'title': data.get('title', ''),
'url': data.get('url', data.get('href', '')),
'content': data.get('body', data.get('snippet', data.get('content', '')))
})
except json.JSONDecodeError:
web_content.append({
'title': '',
'url': '',
'content': item.text[:1000]
})
if web_content:
return web_content
except Exception as e:
logger.error(f"Error calling web search MCP tool: {e}")
else:
logger.debug("No MCP web search tool discovered in current catalog")
return []
except Exception as e:
logger.error(f"Web search MCP tool error: {e}")
return []
async def search_web_mcp(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
"""Search web using MCP tools - tries web search MCP tool first, then falls back to direct search"""
results = await search_web_mcp_tool(query, max_results)
if results:
logger.info(f"✅ Web search via MCP tool: found {len(results)} results")
return results
logger.info("ℹ️ [Direct API] No web search MCP tool found, using direct DuckDuckGo search (results will be summarized with Gemini MCP)")
return search_web_fallback(query, max_results)
def search_web_fallback(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
"""Fallback web search using DuckDuckGo directly (when MCP is not available)"""
logger.info(f"🔍 [Direct API] Performing web search using DuckDuckGo API for: {query[:100]}...")
try:
from ddgs import DDGS
import requests
from bs4 import BeautifulSoup
except ImportError:
logger.error("Fallback dependencies (ddgs, requests, beautifulsoup4) not available")
return []
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
web_content = []
for result in results:
try:
url = result.get('href', '')
title = result.get('title', '')
snippet = result.get('body', '')
try:
response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'})
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
if len(text) > 1000:
text = text[:1000] + "..."
web_content.append({
'title': title,
'url': url,
'content': snippet + "\n" + text[:500] if text else snippet
})
else:
web_content.append({
'title': title,
'url': url,
'content': snippet
})
except:
web_content.append({
'title': title,
'url': url,
'content': snippet
})
except Exception as e:
logger.error(f"Error processing search result: {e}")
continue
logger.info(f"✅ [Direct API] Web search completed: {len(web_content)} results")
return web_content
except Exception as e:
logger.error(f"❌ [Direct API] Web search error: {e}")
return []
def search_web(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
"""Search web using MCP tools (synchronous wrapper) - prioritizes MCP over direct ddgs"""
if MCP_AVAILABLE:
try:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_running():
if nest_asyncio:
results = nest_asyncio.run(search_web_mcp(query, max_results))
if results:
return results
else:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, search_web_mcp(query, max_results))
results = future.result(timeout=30)
if results:
return results
else:
results = loop.run_until_complete(search_web_mcp(query, max_results))
if results:
return results
except Exception as e:
logger.error(f"Error running async MCP search: {e}")
logger.info("ℹ️ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)")
return search_web_fallback(query, max_results)
async def summarize_web_content_gemini(content_list: list, query: str) -> str:
"""Summarize web search results using Gemini MCP"""
combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
user_prompt = f"""Summarize the following web search results related to the query: "{query}"
Extract key medical information, facts, and insights. Be concise and focus on reliable information.
Search Results:
{combined_content}
Summary:"""
system_prompt = "You are a medical information summarizer. Extract and summarize key medical facts accurately."
result = await call_agent(
user_prompt=user_prompt,
system_prompt=system_prompt,
model=GEMINI_MODEL,
temperature=0.5
)
return result.strip()
def summarize_web_content(content_list: list, query: str) -> str:
"""Summarize web search results using Gemini MCP"""
if not MCP_AVAILABLE:
logger.warning("Gemini MCP not available for summarization")
if content_list:
return content_list[0].get('content', '')[:500]
return ""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
if nest_asyncio:
summary = nest_asyncio.run(summarize_web_content_gemini(content_list, query))
if summary:
return summary
else:
logger.error("Error in nested async summarization: nest_asyncio not available")
else:
summary = loop.run_until_complete(summarize_web_content_gemini(content_list, query))
if summary:
return summary
except Exception as e:
logger.error(f"Gemini MCP summarization error: {e}")
if content_list:
return content_list[0].get('content', '')[:500]
return ""
|