LiamKhoaLe commited on
Commit
7cf238e
Β·
1 Parent(s): eaec621

Upd agent mcp error handlers

Browse files
Files changed (2) hide show
  1. agent.py +27 -6
  2. app.py +54 -27
agent.py CHANGED
@@ -34,6 +34,13 @@ except ImportError:
34
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
35
  logger = logging.getLogger(__name__)
36
 
 
 
 
 
 
 
 
37
  # Initialize Gemini
38
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
39
  if not GEMINI_API_KEY:
@@ -268,12 +275,26 @@ async def main():
268
  # Use stdio_server from mcp.server.stdio
269
  from mcp.server.stdio import stdio_server
270
 
271
- async with stdio_server() as streams:
272
- await app.run(
273
- streams[0], # read_stream
274
- streams[1], # write_stream
275
- app.create_initialization_options()
276
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
 
278
  if __name__ == "__main__":
279
  asyncio.run(main())
 
34
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
35
  logger = logging.getLogger(__name__)
36
 
37
+ # Suppress warnings from MCP SDK during initialization
38
+ # These warnings are expected during the initialization handshake
39
+ mcp_logger = logging.getLogger("mcp")
40
+ mcp_logger.setLevel(logging.ERROR) # Only show errors, suppress warnings during init
41
+ root_logger = logging.getLogger("root")
42
+ root_logger.setLevel(logging.ERROR) # Suppress root logger warnings during init
43
+
44
  # Initialize Gemini
45
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
46
  if not GEMINI_API_KEY:
 
275
  # Use stdio_server from mcp.server.stdio
276
  from mcp.server.stdio import stdio_server
277
 
278
+ # Suppress root logger warnings during initialization
279
+ # These are expected during the MCP initialization handshake
280
+ original_root_level = logging.getLogger("root").level
281
+ logging.getLogger("root").setLevel(logging.ERROR)
282
+
283
+ try:
284
+ async with stdio_server() as streams:
285
+ # Restore logging after initialization
286
+ logging.getLogger("root").setLevel(original_root_level)
287
+ logger.info("βœ… MCP server initialized and ready")
288
+
289
+ await app.run(
290
+ streams[0], # read_stream
291
+ streams[1], # write_stream
292
+ app.create_initialization_options()
293
+ )
294
+ except Exception as e:
295
+ logging.getLogger("root").setLevel(original_root_level)
296
+ logger.error(f"❌ MCP server error: {e}")
297
+ raise
298
 
299
  if __name__ == "__main__":
300
  asyncio.run(main())
app.py CHANGED
@@ -274,30 +274,47 @@ async def get_mcp_session():
274
  read, write = await stdio_ctx.__aenter__()
275
 
276
  # Create ClientSession from the streams
 
277
  session = ClientSession(read, write)
278
- await session.__aenter__()
279
 
280
- # Wait for the server to fully initialize
281
- # The server needs time to start up and be ready
282
- # Increased wait time and retries for better reliability
283
- logger.info("⏳ Waiting for MCP server to initialize...")
284
- await asyncio.sleep(2.0) # Initial wait
 
 
 
 
 
 
 
 
285
 
286
  # Verify the session works by listing tools with retries
287
- max_init_retries = 15
 
288
  tools_listed = False
289
  tools = None
290
  for init_attempt in range(max_init_retries):
291
  try:
292
  tools = await session.list_tools()
293
  if tools and hasattr(tools, 'tools') and len(tools.tools) > 0:
294
- logger.info(f"βœ… MCP server initialized with {len(tools.tools)} tools: {[t.name for t in tools.tools]}")
295
  tools_listed = True
296
  break
297
  except Exception as e:
298
- if init_attempt < max_init_retries - 1:
299
- wait_time = 0.5 * (init_attempt + 1) # Progressive wait: 0.5s, 1s, 1.5s...
300
- logger.debug(f"Initialization attempt {init_attempt + 1}/{max_init_retries} failed, waiting {wait_time}s before retry...")
 
 
 
 
 
 
 
 
301
  await asyncio.sleep(wait_time)
302
  else:
303
  logger.error(f"❌ Could not list tools after {max_init_retries} attempts: {e}")
@@ -398,8 +415,8 @@ async def call_agent(user_prompt: str, system_prompt: str = None, files: list =
398
  if temperature is not None:
399
  arguments["temperature"] = temperature
400
 
401
- logger.info(f"πŸ”§ Calling Gemini MCP tool '{generate_tool.name}' for: {user_prompt[:100]}...")
402
- logger.info(f"πŸ“‹ MCP Arguments: model={model}, temperature={temperature}, files={len(files) if files else 0}")
403
  result = await session.call_tool(generate_tool.name, arguments=arguments)
404
 
405
  # Parse result
@@ -407,9 +424,9 @@ async def call_agent(user_prompt: str, system_prompt: str = None, files: list =
407
  for item in result.content:
408
  if hasattr(item, 'text'):
409
  response_text = item.text.strip()
410
- logger.info(f"βœ… Gemini MCP returned response ({len(response_text)} chars)")
411
  return response_text
412
- logger.warning("⚠️ Gemini MCP returned empty or invalid result")
413
  return ""
414
  except Exception as e:
415
  logger.error(f"Gemini MCP call error: {e}")
@@ -770,6 +787,7 @@ async def search_web_mcp_tool(query: str, max_results: int = 5) -> list:
770
 
771
  if search_tool:
772
  try:
 
773
  # Call the search tool
774
  result = await session.call_tool(
775
  search_tool.name,
@@ -813,7 +831,7 @@ async def search_web_mcp_tool(query: str, max_results: int = 5) -> list:
813
  })
814
 
815
  if web_content:
816
- logger.info(f"Web search MCP returned {len(web_content)} results")
817
  return web_content
818
  except Exception as e:
819
  logger.error(f"Error calling web search MCP tool: {e}")
@@ -834,11 +852,12 @@ async def search_web_mcp(query: str, max_results: int = 5) -> list:
834
  # If no web search MCP tool available, use direct search (ddgs)
835
  # Note: Gemini MCP doesn't have web search capability, so we use direct API
836
  # The results will then be summarized using Gemini MCP
837
- logger.info("ℹ️ No web search MCP tool found, using direct DuckDuckGo search (results will be summarized with Gemini MCP)")
838
  return search_web_fallback(query, max_results)
839
 
840
  def search_web_fallback(query: str, max_results: int = 5) -> list:
841
  """Fallback web search using DuckDuckGo directly (when MCP is not available)"""
 
842
  # Always import here to ensure availability
843
  try:
844
  from ddgs import DDGS
@@ -893,9 +912,10 @@ def search_web_fallback(query: str, max_results: int = 5) -> list:
893
  except Exception as e:
894
  logger.error(f"Error processing search result: {e}")
895
  continue
 
896
  return web_content
897
  except Exception as e:
898
- logger.error(f"Web search error: {e}")
899
  return []
900
 
901
  def search_web(query: str, max_results: int = 5) -> list:
@@ -933,11 +953,12 @@ def search_web(query: str, max_results: int = 5) -> list:
933
  logger.error(f"Error running async MCP search: {e}")
934
 
935
  # Only use ddgs fallback if MCP is not available or returned no results
936
- logger.warning("Falling back to direct ddgs search (MCP unavailable or returned no results)")
937
  return search_web_fallback(query, max_results)
938
 
939
  async def summarize_web_content_gemini(content_list: list, query: str) -> str:
940
  """Summarize web search results using Gemini MCP"""
 
941
  combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
942
 
943
  user_prompt = f"""Summarize the following web search results related to the query: "{query}"
@@ -959,6 +980,11 @@ Summary:"""
959
  temperature=0.5
960
  )
961
 
 
 
 
 
 
962
  return result.strip()
963
 
964
  def summarize_web_content(content_list: list, query: str) -> str:
@@ -1012,6 +1038,7 @@ def get_llm_for_rag(temperature=0.7, max_new_tokens=256, top_p=0.95, top_k=50):
1012
 
1013
  async def autonomous_reasoning_gemini(query: str) -> dict:
1014
  """Autonomous reasoning using Gemini MCP"""
 
1015
  reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
1016
 
1017
  Query: "{query}"
@@ -1086,17 +1113,17 @@ def autonomous_reasoning(query: str, history: list) -> dict:
1086
  }
1087
 
1088
  try:
1089
- logger.info("πŸ€” Using Gemini MCP for autonomous reasoning...")
1090
  loop = asyncio.get_event_loop()
1091
  if loop.is_running():
1092
  try:
1093
  import nest_asyncio
1094
  reasoning = nest_asyncio.run(autonomous_reasoning_gemini(query))
1095
  if reasoning and reasoning.get("query_type") != "general_info": # Check if we got real reasoning
1096
- logger.info(f"βœ… Gemini MCP reasoning successful: {reasoning.get('query_type')}, complexity: {reasoning.get('complexity')}")
1097
  return reasoning
1098
  else:
1099
- logger.warning("⚠️ Gemini MCP returned fallback reasoning, using it anyway")
1100
  return reasoning
1101
  except Exception as e:
1102
  logger.error(f"❌ Error in nested async reasoning: {e}")
@@ -1105,10 +1132,10 @@ def autonomous_reasoning(query: str, history: list) -> dict:
1105
  else:
1106
  reasoning = loop.run_until_complete(autonomous_reasoning_gemini(query))
1107
  if reasoning and reasoning.get("query_type") != "general_info":
1108
- logger.info(f"βœ… Gemini MCP reasoning successful: {reasoning.get('query_type')}, complexity: {reasoning.get('complexity')}")
1109
  return reasoning
1110
  else:
1111
- logger.warning("⚠️ Gemini MCP returned fallback reasoning, using it anyway")
1112
  return reasoning
1113
  except Exception as e:
1114
  logger.error(f"❌ Gemini MCP reasoning error: {e}")
@@ -1615,16 +1642,16 @@ def stream_chat(
1615
  web_sources = []
1616
  web_urls = [] # Store URLs for citations
1617
  if final_use_web_search:
1618
- logger.info("🌐 Performing web search (MCP if available, else direct API)...")
1619
  web_results = search_web(message, max_results=5)
1620
  if web_results:
1621
  logger.info(f"πŸ“Š Found {len(web_results)} web search results, now summarizing with Gemini MCP...")
1622
  web_summary = summarize_web_content(web_results, message)
1623
  if web_summary and len(web_summary) > 50: # Check if we got a real summary
1624
- logger.info(f"βœ… Gemini MCP summarization successful ({len(web_summary)} chars)")
1625
  web_context = f"\n\nAdditional Web Sources (summarized with Gemini MCP):\n{web_summary}"
1626
  else:
1627
- logger.warning("⚠️ Gemini MCP summarization failed or returned empty, using raw results")
1628
  # Fallback: use first result's content
1629
  web_context = f"\n\nAdditional Web Sources:\n{web_results[0].get('content', '')[:500]}"
1630
  web_sources = [r['title'] for r in web_results[:3]]
 
274
  read, write = await stdio_ctx.__aenter__()
275
 
276
  # Create ClientSession from the streams
277
+ # The __aenter__() method automatically handles the initialization handshake
278
  session = ClientSession(read, write)
 
279
 
280
+ # Wait a moment for the server process to start
281
+ await asyncio.sleep(1.0)
282
+
283
+ try:
284
+ # Initialize the session (this sends initialize request and waits for response)
285
+ await session.__aenter__()
286
+ logger.info("⏳ MCP session initialized, verifying tools...")
287
+ except Exception as e:
288
+ logger.warning(f"MCP session initialization had an issue (may be expected): {e}")
289
+ # Continue anyway - the session might still work
290
+
291
+ # Wait a bit more for the server to be fully ready after initialization
292
+ await asyncio.sleep(1.5)
293
 
294
  # Verify the session works by listing tools with retries
295
+ # This confirms the server is ready to handle requests
296
+ max_init_retries = 10
297
  tools_listed = False
298
  tools = None
299
  for init_attempt in range(max_init_retries):
300
  try:
301
  tools = await session.list_tools()
302
  if tools and hasattr(tools, 'tools') and len(tools.tools) > 0:
303
+ logger.info(f"βœ… MCP server ready with {len(tools.tools)} tools: {[t.name for t in tools.tools]}")
304
  tools_listed = True
305
  break
306
  except Exception as e:
307
+ error_str = str(e).lower()
308
+ # Ignore initialization-related errors during the handshake phase
309
+ if "initialization" in error_str or "before initialization" in error_str:
310
+ if init_attempt < max_init_retries - 1:
311
+ wait_time = 0.5 * (init_attempt + 1) # Progressive wait: 0.5s, 1s, 1.5s...
312
+ logger.debug(f"Server still initializing (attempt {init_attempt + 1}/{max_init_retries}), waiting {wait_time}s...")
313
+ await asyncio.sleep(wait_time)
314
+ continue
315
+ elif init_attempt < max_init_retries - 1:
316
+ wait_time = 0.5 * (init_attempt + 1)
317
+ logger.debug(f"Tool listing attempt {init_attempt + 1}/{max_init_retries} failed: {e}, waiting {wait_time}s...")
318
  await asyncio.sleep(wait_time)
319
  else:
320
  logger.error(f"❌ Could not list tools after {max_init_retries} attempts: {e}")
 
415
  if temperature is not None:
416
  arguments["temperature"] = temperature
417
 
418
+ logger.info(f"πŸ”§ [MCP] Calling Gemini MCP tool '{generate_tool.name}' for: {user_prompt[:100]}...")
419
+ logger.info(f"πŸ“‹ [MCP] Arguments: model={model}, temperature={temperature}, files={len(files) if files else 0}")
420
  result = await session.call_tool(generate_tool.name, arguments=arguments)
421
 
422
  # Parse result
 
424
  for item in result.content:
425
  if hasattr(item, 'text'):
426
  response_text = item.text.strip()
427
+ logger.info(f"βœ… [MCP] Gemini MCP returned response ({len(response_text)} chars)")
428
  return response_text
429
+ logger.warning("⚠️ [MCP] Gemini MCP returned empty or invalid result")
430
  return ""
431
  except Exception as e:
432
  logger.error(f"Gemini MCP call error: {e}")
 
787
 
788
  if search_tool:
789
  try:
790
+ logger.info(f"πŸ” [MCP] Using web search MCP tool '{search_tool.name}' for: {query[:100]}...")
791
  # Call the search tool
792
  result = await session.call_tool(
793
  search_tool.name,
 
831
  })
832
 
833
  if web_content:
834
+ logger.info(f"βœ… [MCP] Web search MCP tool returned {len(web_content)} results")
835
  return web_content
836
  except Exception as e:
837
  logger.error(f"Error calling web search MCP tool: {e}")
 
852
  # If no web search MCP tool available, use direct search (ddgs)
853
  # Note: Gemini MCP doesn't have web search capability, so we use direct API
854
  # The results will then be summarized using Gemini MCP
855
+ logger.info("ℹ️ [Direct API] No web search MCP tool found, using direct DuckDuckGo search (results will be summarized with Gemini MCP)")
856
  return search_web_fallback(query, max_results)
857
 
858
  def search_web_fallback(query: str, max_results: int = 5) -> list:
859
  """Fallback web search using DuckDuckGo directly (when MCP is not available)"""
860
+ logger.info(f"πŸ” [Direct API] Performing web search using DuckDuckGo API for: {query[:100]}...")
861
  # Always import here to ensure availability
862
  try:
863
  from ddgs import DDGS
 
912
  except Exception as e:
913
  logger.error(f"Error processing search result: {e}")
914
  continue
915
+ logger.info(f"βœ… [Direct API] Web search completed: {len(web_content)} results")
916
  return web_content
917
  except Exception as e:
918
+ logger.error(f"❌ [Direct API] Web search error: {e}")
919
  return []
920
 
921
  def search_web(query: str, max_results: int = 5) -> list:
 
953
  logger.error(f"Error running async MCP search: {e}")
954
 
955
  # Only use ddgs fallback if MCP is not available or returned no results
956
+ logger.info("ℹ️ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)")
957
  return search_web_fallback(query, max_results)
958
 
959
  async def summarize_web_content_gemini(content_list: list, query: str) -> str:
960
  """Summarize web search results using Gemini MCP"""
961
+ logger.info(f"πŸ“ [MCP] Summarizing {len(content_list)} web search results using Gemini MCP...")
962
  combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
963
 
964
  user_prompt = f"""Summarize the following web search results related to the query: "{query}"
 
980
  temperature=0.5
981
  )
982
 
983
+ if result:
984
+ logger.info(f"βœ… [MCP] Web content summarized successfully using Gemini MCP ({len(result)} chars)")
985
+ else:
986
+ logger.warning("⚠️ [MCP] Gemini MCP summarization returned empty result")
987
+
988
  return result.strip()
989
 
990
  def summarize_web_content(content_list: list, query: str) -> str:
 
1038
 
1039
  async def autonomous_reasoning_gemini(query: str) -> dict:
1040
  """Autonomous reasoning using Gemini MCP"""
1041
+ logger.info(f"🧠 [MCP] Analyzing query with Gemini MCP: {query[:100]}...")
1042
  reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
1043
 
1044
  Query: "{query}"
 
1113
  }
1114
 
1115
  try:
1116
+ logger.info("πŸ€” [MCP] Using Gemini MCP for autonomous reasoning...")
1117
  loop = asyncio.get_event_loop()
1118
  if loop.is_running():
1119
  try:
1120
  import nest_asyncio
1121
  reasoning = nest_asyncio.run(autonomous_reasoning_gemini(query))
1122
  if reasoning and reasoning.get("query_type") != "general_info": # Check if we got real reasoning
1123
+ logger.info(f"βœ… [MCP] Gemini MCP reasoning successful: {reasoning.get('query_type')}, complexity: {reasoning.get('complexity')}")
1124
  return reasoning
1125
  else:
1126
+ logger.warning("⚠️ [MCP] Gemini MCP returned fallback reasoning, using it anyway")
1127
  return reasoning
1128
  except Exception as e:
1129
  logger.error(f"❌ Error in nested async reasoning: {e}")
 
1132
  else:
1133
  reasoning = loop.run_until_complete(autonomous_reasoning_gemini(query))
1134
  if reasoning and reasoning.get("query_type") != "general_info":
1135
+ logger.info(f"βœ… [MCP] Gemini MCP reasoning successful: {reasoning.get('query_type')}, complexity: {reasoning.get('complexity')}")
1136
  return reasoning
1137
  else:
1138
+ logger.warning("⚠️ [MCP] Gemini MCP returned fallback reasoning, using it anyway")
1139
  return reasoning
1140
  except Exception as e:
1141
  logger.error(f"❌ Gemini MCP reasoning error: {e}")
 
1642
  web_sources = []
1643
  web_urls = [] # Store URLs for citations
1644
  if final_use_web_search:
1645
+ logger.info("🌐 Performing web search (will use Gemini MCP for summarization)...")
1646
  web_results = search_web(message, max_results=5)
1647
  if web_results:
1648
  logger.info(f"πŸ“Š Found {len(web_results)} web search results, now summarizing with Gemini MCP...")
1649
  web_summary = summarize_web_content(web_results, message)
1650
  if web_summary and len(web_summary) > 50: # Check if we got a real summary
1651
+ logger.info(f"βœ… [MCP] Gemini MCP summarization successful ({len(web_summary)} chars)")
1652
  web_context = f"\n\nAdditional Web Sources (summarized with Gemini MCP):\n{web_summary}"
1653
  else:
1654
+ logger.warning("⚠️ [MCP] Gemini MCP summarization failed or returned empty, using raw results")
1655
  # Fallback: use first result's content
1656
  web_context = f"\n\nAdditional Web Sources:\n{web_results[0].get('content', '')[:500]}"
1657
  web_sources = [r['title'] for r in web_results[:3]]