LiamKhoaLe commited on
Commit
3d6d107
·
1 Parent(s): 377bc0f

Simplify MCP arch

Browse files
Files changed (2) hide show
  1. agent.py +11 -47
  2. app.py +40 -297
agent.py CHANGED
@@ -32,16 +32,14 @@ except ImportError:
32
  sys.exit(1)
33
 
34
  # Configure logging
35
- # Use DEBUG level to see all MCP protocol messages
36
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
37
  logger = logging.getLogger(__name__)
38
 
39
- # Enable detailed logging for MCP protocol debugging
40
- # We want to see what requests the server receives
41
  mcp_logger = logging.getLogger("mcp")
42
- mcp_logger.setLevel(logging.DEBUG) # Show all MCP protocol messages for debugging
43
  root_logger = logging.getLogger("root")
44
- root_logger.setLevel(logging.INFO) # Show info level for root logger
45
 
46
  # Initialize Gemini
47
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
@@ -121,8 +119,6 @@ def prepare_gemini_files(files: list) -> list:
121
  @app.list_tools()
122
  async def list_tools() -> list[Tool]:
123
  """List available tools"""
124
- logger.info("📋 MCP server received list_tools request")
125
- logger.debug("list_tools() called - preparing tool list")
126
  try:
127
  tools = [
128
  Tool(
@@ -164,13 +160,9 @@ async def list_tools() -> list[Tool]:
164
  }
165
  )
166
  ]
167
- logger.info(f"✅ MCP server returning {len(tools)} tools: {[t.name for t in tools]}")
168
- logger.debug(f"Tool details: {tools[0].name} - {tools[0].description[:50]}...")
169
  return tools
170
  except Exception as e:
171
- logger.error(f"Error in list_tools(): {e}")
172
- import traceback
173
- logger.debug(f"list_tools error traceback:\n{traceback.format_exc()}")
174
  raise
175
 
176
  @app.call_tool()
@@ -243,7 +235,6 @@ async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageC
243
 
244
  # Extract text from response
245
  if response and hasattr(response, 'text') and response.text:
246
- logger.info(f"✅ Gemini generated content successfully ({len(response.text)} chars)")
247
  return [TextContent(type="text", text=response.text)]
248
  elif response and hasattr(response, 'candidates') and response.candidates:
249
  # Try to extract text from candidates if response is a list of candidates
@@ -255,7 +246,6 @@ async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageC
255
  text_parts.append(part.text)
256
  if text_parts:
257
  text = ''.join(text_parts)
258
- logger.info(f"✅ Gemini generated content successfully ({len(text)} chars)")
259
  return [TextContent(type="text", text=text)]
260
  else:
261
  logger.warning("Gemini returned response but no text found")
@@ -266,14 +256,10 @@ async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageC
266
 
267
  except Exception as e:
268
  logger.error(f"Error generating content: {e}")
269
- import traceback
270
- logger.debug(traceback.format_exc())
271
  return [TextContent(type="text", text=f"Error: {str(e)}")]
272
 
273
  except Exception as e:
274
  logger.error(f"Error in generate_content: {e}")
275
- import traceback
276
- logger.debug(traceback.format_exc())
277
  return [TextContent(type="text", text=f"Error: {str(e)}")]
278
  else:
279
  return [TextContent(type="text", text=f"Unknown tool: {name}")]
@@ -295,15 +281,10 @@ async def main():
295
  logging.getLogger("root").setLevel(logging.INFO)
296
 
297
  try:
298
- logger.info("🔄 Setting up stdio_server...")
299
  async with stdio_server() as streams:
300
- logger.info("✅ MCP server stdio streams ready")
301
- logger.debug(f"Streams: read={type(streams[0])}, write={type(streams[1])}")
302
-
303
  # Create initialization options
304
  # The Server class will automatically provide its capabilities based on
305
  # the registered @app.list_tools() and @app.call_tool() handlers
306
- logger.debug("Preparing server capabilities...")
307
  try:
308
  # Try to get capabilities from the server if the method exists
309
  if hasattr(app, 'get_capabilities'):
@@ -314,42 +295,29 @@ async def main():
314
  notification_options=NotificationOptions(),
315
  experimental_capabilities={}
316
  )
317
- logger.debug(f"Got capabilities with NotificationOptions: {server_capabilities}")
318
- except (ImportError, AttributeError, TypeError) as e:
319
- logger.debug(f"NotificationOptions not available: {e}, trying without...")
320
  # Fallback: try without NotificationOptions
321
  try:
322
  server_capabilities = app.get_capabilities()
323
- logger.debug(f"Got capabilities: {server_capabilities}")
324
- except Exception as e2:
325
- logger.debug(f"get_capabilities() failed: {e2}, using empty dict")
326
  # If get_capabilities doesn't work, create minimal capabilities
327
  server_capabilities = {}
328
  else:
329
- logger.debug("Server doesn't have get_capabilities method, using empty dict")
330
  # Server will provide capabilities automatically, use empty dict
331
  server_capabilities = {}
332
- except Exception as e:
333
- logger.warning(f"Could not get server capabilities: {e}, server will provide defaults")
334
- import traceback
335
- logger.debug(f"Capabilities error traceback:\n{traceback.format_exc()}")
336
  # Server will handle capabilities automatically
337
  server_capabilities = {}
338
 
339
  # Create initialization options
340
- # The server_name and server_version are required
341
- # Capabilities will be automatically determined by the Server from registered handlers
342
- logger.info("📋 Creating initialization options...")
343
  init_options = InitializationOptions(
344
  server_name="gemini-mcp-server",
345
  server_version="1.0.0",
346
  capabilities=server_capabilities
347
  )
348
- logger.debug(f"Initialization options: {init_options}")
349
 
350
  # Run the server with initialization options
351
- logger.info("🚀 Starting MCP server run loop...")
352
- logger.info(" Server is ready to accept requests")
353
  try:
354
  await app.run(
355
  read_stream=streams[0],
@@ -357,15 +325,11 @@ async def main():
357
  initialization_options=init_options
358
  )
359
  except Exception as run_error:
360
- logger.error(f"Error in app.run(): {run_error}")
361
- import traceback
362
- logger.debug(f"app.run() error traceback:\n{traceback.format_exc()}")
363
  raise
364
  except Exception as e:
365
  logging.getLogger("root").setLevel(original_root_level)
366
- logger.error(f"MCP server fatal error: {type(e).__name__}: {e}")
367
- import traceback
368
- logger.debug(f"Server error traceback:\n{traceback.format_exc()}")
369
  raise
370
 
371
  if __name__ == "__main__":
 
32
  sys.exit(1)
33
 
34
  # Configure logging
35
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
36
  logger = logging.getLogger(__name__)
37
 
38
+ # Set MCP logging to WARNING to reduce noise
 
39
  mcp_logger = logging.getLogger("mcp")
40
+ mcp_logger.setLevel(logging.WARNING)
41
  root_logger = logging.getLogger("root")
42
+ root_logger.setLevel(logging.INFO)
43
 
44
  # Initialize Gemini
45
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
 
119
  @app.list_tools()
120
  async def list_tools() -> list[Tool]:
121
  """List available tools"""
 
 
122
  try:
123
  tools = [
124
  Tool(
 
160
  }
161
  )
162
  ]
 
 
163
  return tools
164
  except Exception as e:
165
+ logger.error(f"Error in list_tools(): {e}")
 
 
166
  raise
167
 
168
  @app.call_tool()
 
235
 
236
  # Extract text from response
237
  if response and hasattr(response, 'text') and response.text:
 
238
  return [TextContent(type="text", text=response.text)]
239
  elif response and hasattr(response, 'candidates') and response.candidates:
240
  # Try to extract text from candidates if response is a list of candidates
 
246
  text_parts.append(part.text)
247
  if text_parts:
248
  text = ''.join(text_parts)
 
249
  return [TextContent(type="text", text=text)]
250
  else:
251
  logger.warning("Gemini returned response but no text found")
 
256
 
257
  except Exception as e:
258
  logger.error(f"Error generating content: {e}")
 
 
259
  return [TextContent(type="text", text=f"Error: {str(e)}")]
260
 
261
  except Exception as e:
262
  logger.error(f"Error in generate_content: {e}")
 
 
263
  return [TextContent(type="text", text=f"Error: {str(e)}")]
264
  else:
265
  return [TextContent(type="text", text=f"Unknown tool: {name}")]
 
281
  logging.getLogger("root").setLevel(logging.INFO)
282
 
283
  try:
 
284
  async with stdio_server() as streams:
 
 
 
285
  # Create initialization options
286
  # The Server class will automatically provide its capabilities based on
287
  # the registered @app.list_tools() and @app.call_tool() handlers
 
288
  try:
289
  # Try to get capabilities from the server if the method exists
290
  if hasattr(app, 'get_capabilities'):
 
295
  notification_options=NotificationOptions(),
296
  experimental_capabilities={}
297
  )
298
+ except (ImportError, AttributeError, TypeError):
 
 
299
  # Fallback: try without NotificationOptions
300
  try:
301
  server_capabilities = app.get_capabilities()
302
+ except Exception:
 
 
303
  # If get_capabilities doesn't work, create minimal capabilities
304
  server_capabilities = {}
305
  else:
 
306
  # Server will provide capabilities automatically, use empty dict
307
  server_capabilities = {}
308
+ except Exception:
 
 
 
309
  # Server will handle capabilities automatically
310
  server_capabilities = {}
311
 
312
  # Create initialization options
 
 
 
313
  init_options = InitializationOptions(
314
  server_name="gemini-mcp-server",
315
  server_version="1.0.0",
316
  capabilities=server_capabilities
317
  )
 
318
 
319
  # Run the server with initialization options
320
+ logger.info("MCP server ready")
 
321
  try:
322
  await app.run(
323
  read_stream=streams[0],
 
325
  initialization_options=init_options
326
  )
327
  except Exception as run_error:
328
+ logger.error(f"Error in app.run(): {run_error}")
 
 
329
  raise
330
  except Exception as e:
331
  logging.getLogger("root").setLevel(original_root_level)
332
+ logger.error(f"MCP server fatal error: {type(e).__name__}: {e}")
 
 
333
  raise
334
 
335
  if __name__ == "__main__":
app.py CHANGED
@@ -34,13 +34,12 @@ from llama_index.embeddings.huggingface import HuggingFaceEmbedding
34
  from tqdm import tqdm
35
  from langdetect import detect, LangDetectException
36
  os.environ["TOKENIZERS_PARALLELISM"] = "false"
37
- # Enable DEBUG logging for MCP troubleshooting
38
- # Set to logging.INFO in production for less verbose output
39
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
40
  logger = logging.getLogger(__name__)
41
- # Enable MCP client debugging
42
  mcp_client_logger = logging.getLogger("mcp.client")
43
- mcp_client_logger.setLevel(logging.DEBUG)
44
  hf_logging.set_verbosity_error()
45
 
46
  # MCP imports
@@ -54,14 +53,6 @@ try:
54
  except ImportError:
55
  pass # nest_asyncio is optional
56
 
57
- # Check MCP SDK version for debugging
58
- try:
59
- import mcp
60
- mcp_version = getattr(mcp, '__version__', 'unknown')
61
- logger.debug(f"MCP SDK version: {mcp_version}")
62
- except:
63
- logger.debug("Could not determine MCP SDK version")
64
-
65
  MCP_AVAILABLE = True
66
  except ImportError as e:
67
  logger.warning(f"MCP SDK not available: {e}")
@@ -238,29 +229,8 @@ async def get_mcp_session():
238
 
239
  # Check if session exists and is still valid
240
  if global_mcp_session is not None:
241
- try:
242
- # Test if session is still alive by listing tools
243
- logger.debug("Testing existing MCP session...")
244
- await global_mcp_session.list_tools()
245
- logger.debug("Existing MCP session is valid")
246
- return global_mcp_session
247
- except Exception as e:
248
- logger.warning(f"Existing MCP session invalid, recreating: {e}")
249
- import traceback
250
- logger.debug(f"Session validation error traceback: {traceback.format_exc()}")
251
- # Clean up old session
252
- try:
253
- if global_mcp_session is not None:
254
- await global_mcp_session.__aexit__(None, None, None)
255
- except Exception as cleanup_error:
256
- logger.debug(f"Error cleaning up session: {cleanup_error}")
257
- try:
258
- if global_mcp_stdio_ctx is not None:
259
- await global_mcp_stdio_ctx.__aexit__(None, None, None)
260
- except Exception as cleanup_error:
261
- logger.debug(f"Error cleaning up stdio context: {cleanup_error}")
262
- global_mcp_session = None
263
- global_mcp_stdio_ctx = None
264
 
265
  # Create new session using correct MCP SDK pattern
266
  try:
@@ -268,14 +238,12 @@ async def get_mcp_session():
268
  mcp_env = os.environ.copy()
269
  if GEMINI_API_KEY:
270
  mcp_env["GEMINI_API_KEY"] = GEMINI_API_KEY
271
- logger.debug("GEMINI_API_KEY set in MCP server environment")
272
  else:
273
  logger.warning("GEMINI_API_KEY not set in environment. Gemini MCP features may not work.")
274
 
275
  # Add other Gemini MCP configuration if set
276
  if os.environ.get("GEMINI_MODEL"):
277
  mcp_env["GEMINI_MODEL"] = os.environ.get("GEMINI_MODEL")
278
- logger.debug(f"GEMINI_MODEL set to: {mcp_env['GEMINI_MODEL']}")
279
  if os.environ.get("GEMINI_TIMEOUT"):
280
  mcp_env["GEMINI_TIMEOUT"] = os.environ.get("GEMINI_TIMEOUT")
281
  if os.environ.get("GEMINI_MAX_OUTPUT_TOKENS"):
@@ -283,8 +251,7 @@ async def get_mcp_session():
283
  if os.environ.get("GEMINI_TEMPERATURE"):
284
  mcp_env["GEMINI_TEMPERATURE"] = os.environ.get("GEMINI_TEMPERATURE")
285
 
286
- logger.info(f"Creating MCP client session with command: {MCP_SERVER_COMMAND} {MCP_SERVER_ARGS}")
287
- logger.debug(f"MCP server args type: {type(MCP_SERVER_ARGS)}, value: {MCP_SERVER_ARGS}")
288
 
289
  server_params = StdioServerParameters(
290
  command=MCP_SERVER_COMMAND,
@@ -292,203 +259,37 @@ async def get_mcp_session():
292
  env=mcp_env
293
  )
294
 
295
- logger.debug("Creating stdio_client context manager...")
296
  # Correct MCP SDK usage: stdio_client is an async context manager
297
  # that yields (read, write) streams
298
  stdio_ctx = stdio_client(server_params)
299
- logger.debug("Entering stdio_client context...")
300
  read, write = await stdio_ctx.__aenter__()
301
- logger.debug("✅ stdio_client context entered, streams obtained")
302
 
303
  # Wait for the server process to start and be ready
304
- logger.info("⏳ Waiting for MCP server process to start...")
305
- await asyncio.sleep(2.0) # Initial wait for server startup
306
 
307
  # Create ClientSession from the streams
308
- logger.debug("Creating ClientSession from streams...")
309
- logger.debug(f"Read stream type: {type(read)}, Write stream type: {type(write)}")
310
-
311
- # Check if ClientSession has any required initialization parameters
312
- try:
313
- import inspect
314
- session_init_sig = inspect.signature(ClientSession.__init__)
315
- logger.debug(f"ClientSession.__init__ signature: {session_init_sig}")
316
- except:
317
- pass
318
-
319
  session = ClientSession(read, write)
320
- logger.debug(f"ClientSession object created: {type(session)}")
321
 
322
  # Initialize the session (this sends initialize request and waits for response + initialized notification)
323
- logger.info("🔄 Initializing MCP session (sending initialize request)...")
324
- logger.debug("About to call session.__aenter__() for initialization handshake...")
 
 
 
 
325
  try:
326
- # The __aenter__() method handles the complete initialization handshake:
327
- # 1. Sends initialize request with client info
328
- # 2. Waits for initialize response from server
329
- # 3. Waits for initialized notification from server (this is critical!)
330
- # According to MCP protocol spec, the client MUST wait for the initialized notification
331
- # before sending any other requests (like list_tools)
332
- init_result = await session.__aenter__()
333
- logger.info(f"✅ MCP session __aenter__() completed")
334
- logger.debug(f"Initialization result: {init_result}")
335
- if hasattr(init_result, '__dict__'):
336
- logger.debug(f"Init result attributes: {init_result.__dict__}")
337
  except Exception as e:
338
  error_msg = str(e)
339
  error_type = type(e).__name__
340
  logger.error(f"❌ MCP session initialization failed: {error_type}: {error_msg}")
341
- import traceback
342
- full_traceback = traceback.format_exc()
343
- logger.debug(f"Initialization error traceback:\n{full_traceback}")
344
-
345
- # Try to get more details about the error
346
- if hasattr(e, 'args') and e.args:
347
- logger.debug(f"Error args: {e.args}")
348
- if hasattr(e, '__dict__'):
349
- logger.debug(f"Error dict: {e.__dict__}")
350
-
351
- # Check if this is a protocol error
352
- if "invalid" in error_msg.lower() or "parameter" in error_msg.lower():
353
- logger.error("⚠️ This appears to be a protocol-level error.")
354
- logger.error(" Possible causes:")
355
- logger.error(" 1. Server not ready to receive initialize request")
356
- logger.error(" 2. Protocol version mismatch between client and server")
357
- logger.error(" 3. Malformed initialization request")
358
- logger.error(" 4. Server rejected initialization parameters")
359
 
360
  # Clean up and return None
361
- try:
362
- await stdio_ctx.__aexit__(None, None, None)
363
- except Exception as cleanup_error:
364
- logger.debug(f"Error during cleanup: {cleanup_error}")
365
- return None
366
-
367
- # After __aenter__() completes, the session should be fully initialized
368
- # However, there may be a race condition where the server has sent the
369
- # initialized notification but hasn't finished setting up its internal state.
370
- # We'll add a small initial delay and then verify by attempting to list tools with retries.
371
- logger.debug("Waiting briefly for server to finalize initialization state...")
372
- await asyncio.sleep(0.1) # Small delay to let server finalize internal state
373
-
374
- # Verify the session works by listing tools with retries
375
- # This handles any edge cases where the server needs additional time
376
- logger.info("🔍 Verifying MCP session by listing tools...")
377
- max_init_retries = 3
378
- tools_listed = False
379
- tools = None
380
- last_error = None
381
-
382
- for init_attempt in range(max_init_retries):
383
- try:
384
- logger.debug(f"Attempting to list tools (attempt {init_attempt + 1}/{max_init_retries})...")
385
- # Use a timeout to prevent hanging
386
- tools = await asyncio.wait_for(session.list_tools(), timeout=10.0)
387
- logger.debug(f"list_tools() returned: type={type(tools)}, value={tools}")
388
-
389
- if tools and hasattr(tools, 'tools') and len(tools.tools) > 0:
390
- tool_names = [t.name for t in tools.tools]
391
- logger.info(f"✅ MCP server ready with {len(tools.tools)} tools: {tool_names}")
392
- tools_listed = True
393
- break
394
- else:
395
- logger.warning(f"list_tools() returned empty or invalid result: {tools}")
396
- if init_attempt < max_init_retries - 1:
397
- # Exponential backoff for empty results
398
- wait_time = 0.5 * (2 ** init_attempt)
399
- logger.debug(f"Empty result, waiting {wait_time:.2f}s before retry...")
400
- await asyncio.sleep(wait_time)
401
- continue
402
- except asyncio.TimeoutError:
403
- last_error = Exception("list_tools() timed out after 10 seconds")
404
- logger.warning(f"⏱️ list_tools() timed out (attempt {init_attempt + 1}/{max_init_retries})")
405
- if init_attempt < max_init_retries - 1:
406
- wait_time = 1.0 * (init_attempt + 1)
407
- await asyncio.sleep(wait_time)
408
- continue
409
- except Exception as e:
410
- last_error = e
411
- error_str = str(e).lower()
412
- error_msg = str(e)
413
- error_type = type(e).__name__
414
-
415
- # Check error data for more details
416
- error_data_msg = ""
417
- if hasattr(e, '__dict__') and 'error' in e.__dict__:
418
- error_obj = e.__dict__.get('error')
419
- if hasattr(error_obj, 'message'):
420
- error_data_msg = error_obj.message.lower()
421
- elif hasattr(error_obj, 'data') and error_obj.data:
422
- error_data_msg = str(error_obj.data).lower()
423
-
424
- # Combine error messages for detection
425
- combined_error = f"{error_str} {error_data_msg}"
426
-
427
- logger.error(f"❌ list_tools() failed (attempt {init_attempt + 1}/{max_init_retries}): {error_type}: {error_msg}")
428
-
429
- # Log detailed error information on first attempt
430
- if init_attempt == 0:
431
- import traceback
432
- logger.debug(f"First list_tools attempt error traceback:\n{traceback.format_exc()}")
433
- if hasattr(e, 'args') and e.args:
434
- logger.debug(f"Error args: {e.args}")
435
- if hasattr(e, '__dict__'):
436
- logger.debug(f"Error dict: {e.__dict__}")
437
-
438
- # Handle "initialization not complete" errors specifically
439
- # This is the key issue: server says "Received request before initialization was complete"
440
- # Even though __aenter__() should have waited for initialized notification,
441
- # the server might need additional time to process it
442
- if ("initialization" in combined_error or
443
- "before initialization" in combined_error or
444
- "not initialized" in combined_error or
445
- "initialization was complete" in combined_error):
446
- if init_attempt < max_init_retries - 1:
447
- # Exponential backoff for initialization issues: 0.5s, 1s, 2s, 4s, 8s
448
- wait_time = 0.5 * (2 ** init_attempt)
449
- logger.warning(f"⚠️ Server initialization not complete yet (attempt {init_attempt + 1}/{max_init_retries})")
450
- logger.debug(f" Waiting {wait_time:.2f}s for server to finish initialization...")
451
- await asyncio.sleep(wait_time)
452
- continue
453
- elif "invalid request" in combined_error or "invalid request parameters" in combined_error:
454
- # Invalid request often means server not ready - treat similar to initialization
455
- if init_attempt < max_init_retries - 1:
456
- # Exponential backoff for invalid request errors
457
- wait_time = 0.5 * (2 ** init_attempt)
458
- logger.warning(f"⚠️ Invalid request parameters - server may not be ready (attempt {init_attempt + 1}/{max_init_retries})")
459
- logger.debug(f" Waiting {wait_time:.2f}s before retry...")
460
- await asyncio.sleep(wait_time)
461
- continue
462
- elif init_attempt < max_init_retries - 1:
463
- # Exponential backoff for other errors
464
- wait_time = 0.5 * (2 ** init_attempt)
465
- logger.debug(f"Waiting {wait_time:.2f}s before retry...")
466
- await asyncio.sleep(wait_time)
467
- else:
468
- logger.error(f"❌ Could not list tools after {max_init_retries} attempts")
469
- # Don't continue - if we can't list tools, the session is not usable
470
- try:
471
- await session.__aexit__(None, None, None)
472
- except:
473
- pass
474
- try:
475
- await stdio_ctx.__aexit__(None, None, None)
476
- except:
477
- pass
478
- return None
479
-
480
- if not tools_listed:
481
- error_msg = str(last_error) if last_error else "Unknown error"
482
- error_type = type(last_error).__name__ if last_error else "UnknownError"
483
- logger.error(f"❌ MCP server failed to initialize - tools could not be listed")
484
- logger.error(f" Last error: {error_type}: {error_msg}")
485
- if last_error:
486
- import traceback
487
- logger.debug(f"Final error traceback:\n{traceback.format_exc()}")
488
- try:
489
- await session.__aexit__(None, None, None)
490
- except:
491
- pass
492
  try:
493
  await stdio_ctx.__aexit__(None, None, None)
494
  except:
@@ -498,14 +299,12 @@ async def get_mcp_session():
498
  # Store both the session and stdio context to keep them alive
499
  global_mcp_session = session
500
  global_mcp_stdio_ctx = stdio_ctx
501
- logger.info("✅ MCP client session created and verified successfully")
502
  return session
503
  except Exception as e:
504
  error_type = type(e).__name__
505
  error_msg = str(e)
506
  logger.error(f"❌ Failed to create MCP client session: {error_type}: {error_msg}")
507
- import traceback
508
- logger.debug(f"Session creation error traceback:\n{traceback.format_exc()}")
509
  global_mcp_session = None
510
  global_mcp_stdio_ctx = None
511
  return None
@@ -522,25 +321,12 @@ async def call_agent(user_prompt: str, system_prompt: str = None, files: list =
522
  logger.warning("Failed to get MCP session for Gemini call")
523
  return ""
524
 
525
- # Retry listing tools if it fails the first time
526
- # Use more retries and longer waits since MCP server might need time
527
- max_retries = 5
528
- tools = None
529
- for attempt in range(max_retries):
530
- try:
531
- tools = await session.list_tools()
532
- if tools and hasattr(tools, 'tools') and len(tools.tools) > 0:
533
- break
534
- else:
535
- raise ValueError("Empty tools list")
536
- except Exception as e:
537
- if attempt < max_retries - 1:
538
- wait_time = 1.0 * (attempt + 1) # Progressive wait
539
- logger.debug(f"Failed to list tools (attempt {attempt + 1}/{max_retries}), waiting {wait_time}s...")
540
- await asyncio.sleep(wait_time)
541
- else:
542
- logger.error(f"❌ Failed to list MCP tools after {max_retries} attempts: {e}")
543
- return ""
544
 
545
  if not tools or not hasattr(tools, 'tools'):
546
  logger.error("Invalid tools response from MCP server")
@@ -571,8 +357,6 @@ async def call_agent(user_prompt: str, system_prompt: str = None, files: list =
571
  if temperature is not None:
572
  arguments["temperature"] = temperature
573
 
574
- logger.info(f"🔧 [MCP] Calling Gemini MCP tool '{generate_tool.name}' for: {user_prompt[:100]}...")
575
- logger.info(f"📋 [MCP] Arguments: model={model}, temperature={temperature}, files={len(files) if files else 0}")
576
  result = await session.call_tool(generate_tool.name, arguments=arguments)
577
 
578
  # Parse result
@@ -580,14 +364,11 @@ async def call_agent(user_prompt: str, system_prompt: str = None, files: list =
580
  for item in result.content:
581
  if hasattr(item, 'text'):
582
  response_text = item.text.strip()
583
- logger.info(f"✅ [MCP] Gemini MCP returned response ({len(response_text)} chars)")
584
  return response_text
585
- logger.warning("⚠️ [MCP] Gemini MCP returned empty or invalid result")
586
  return ""
587
  except Exception as e:
588
  logger.error(f"Gemini MCP call error: {e}")
589
- import traceback
590
- logger.debug(traceback.format_exc())
591
  return ""
592
 
593
  def initialize_medical_model(model_name: str):
@@ -656,8 +437,6 @@ async def transcribe_audio_gemini(audio_path: str) -> str:
656
  return result.strip()
657
  except Exception as e:
658
  logger.error(f"Gemini transcription error: {e}")
659
- import traceback
660
- logger.debug(traceback.format_exc())
661
  return ""
662
 
663
  def transcribe_audio(audio):
@@ -746,7 +525,7 @@ async def generate_speech_mcp(text: str) -> str:
746
  return tmp_file.name
747
  return None
748
  except Exception as e:
749
- logger.debug(f"MCP TTS error: {e}")
750
  return None
751
 
752
  def generate_speech(text: str):
@@ -770,10 +549,9 @@ def generate_speech(text: str):
770
  else:
771
  audio_path = loop.run_until_complete(generate_speech_mcp(text))
772
  if audio_path:
773
- logger.info("Generated speech via MCP")
774
  return audio_path
775
  except Exception as e:
776
- logger.debug(f"MCP TTS not available: {e}")
777
 
778
  # Fallback to local TTS model
779
  if not TTS_AVAILABLE:
@@ -915,19 +693,12 @@ async def search_web_mcp_tool(query: str, max_results: int = 5) -> list:
915
  if session is None:
916
  return []
917
 
918
- # Retry listing tools if it fails the first time
919
- max_retries = 3
920
- tools = None
921
- for attempt in range(max_retries):
922
- try:
923
- tools = await session.list_tools()
924
- break
925
- except Exception as e:
926
- if attempt < max_retries - 1:
927
- await asyncio.sleep(0.5 * (attempt + 1))
928
- else:
929
- logger.error(f"Failed to list MCP tools after {max_retries} attempts: {e}")
930
- return []
931
 
932
  if not tools or not hasattr(tools, 'tools'):
933
  return []
@@ -943,7 +714,6 @@ async def search_web_mcp_tool(query: str, max_results: int = 5) -> list:
943
 
944
  if search_tool:
945
  try:
946
- logger.info(f"🔍 [MCP] Using web search MCP tool '{search_tool.name}' for: {query[:100]}...")
947
  # Call the search tool
948
  result = await session.call_tool(
949
  search_tool.name,
@@ -987,7 +757,6 @@ async def search_web_mcp_tool(query: str, max_results: int = 5) -> list:
987
  })
988
 
989
  if web_content:
990
- logger.info(f"✅ [MCP] Web search MCP tool returned {len(web_content)} results")
991
  return web_content
992
  except Exception as e:
993
  logger.error(f"Error calling web search MCP tool: {e}")
@@ -1114,7 +883,6 @@ def search_web(query: str, max_results: int = 5) -> list:
1114
 
1115
  async def summarize_web_content_gemini(content_list: list, query: str) -> str:
1116
  """Summarize web search results using Gemini MCP"""
1117
- logger.info(f"📝 [MCP] Summarizing {len(content_list)} web search results using Gemini MCP...")
1118
  combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
1119
 
1120
  user_prompt = f"""Summarize the following web search results related to the query: "{query}"
@@ -1133,11 +901,6 @@ Summary:"""
1133
  temperature=0.5
1134
  )
1135
 
1136
- if result:
1137
- logger.info(f"✅ [MCP] Web content summarized successfully using Gemini MCP ({len(result)} chars)")
1138
- else:
1139
- logger.warning("⚠️ [MCP] Gemini MCP summarization returned empty result")
1140
-
1141
  return result.strip()
1142
 
1143
  def summarize_web_content(content_list: list, query: str) -> str:
@@ -1191,7 +954,6 @@ def get_llm_for_rag(temperature=0.7, max_new_tokens=256, top_p=0.95, top_k=50):
1191
 
1192
  async def autonomous_reasoning_gemini(query: str) -> dict:
1193
  """Autonomous reasoning using Gemini MCP"""
1194
- logger.info(f"🧠 [MCP] Analyzing query with Gemini MCP: {query[:100]}...")
1195
  reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
1196
  Query: "{query}"
1197
  Analyze:
@@ -1263,34 +1025,19 @@ def autonomous_reasoning(query: str, history: list) -> dict:
1263
  }
1264
 
1265
  try:
1266
- logger.info("🤔 [MCP] Using Gemini MCP for autonomous reasoning...")
1267
  loop = asyncio.get_event_loop()
1268
  if loop.is_running():
1269
  try:
1270
  import nest_asyncio
1271
  reasoning = nest_asyncio.run(autonomous_reasoning_gemini(query))
1272
- if reasoning and reasoning.get("query_type") != "general_info": # Check if we got real reasoning
1273
- logger.info(f"✅ [MCP] Gemini MCP reasoning successful: {reasoning.get('query_type')}, complexity: {reasoning.get('complexity')}")
1274
- return reasoning
1275
- else:
1276
- logger.warning("⚠️ [MCP] Gemini MCP returned fallback reasoning, using it anyway")
1277
- return reasoning
1278
  except Exception as e:
1279
- logger.error(f"Error in nested async reasoning: {e}")
1280
- import traceback
1281
- logger.debug(traceback.format_exc())
1282
  else:
1283
  reasoning = loop.run_until_complete(autonomous_reasoning_gemini(query))
1284
- if reasoning and reasoning.get("query_type") != "general_info":
1285
- logger.info(f"✅ [MCP] Gemini MCP reasoning successful: {reasoning.get('query_type')}, complexity: {reasoning.get('complexity')}")
1286
- return reasoning
1287
- else:
1288
- logger.warning("⚠️ [MCP] Gemini MCP returned fallback reasoning, using it anyway")
1289
- return reasoning
1290
  except Exception as e:
1291
- logger.error(f"Gemini MCP reasoning error: {e}")
1292
- import traceback
1293
- logger.debug(traceback.format_exc())
1294
 
1295
  # Fallback reasoning only if all attempts failed
1296
  logger.warning("⚠️ Falling back to default reasoning")
@@ -1511,8 +1258,6 @@ async def parse_document_gemini(file_path: str, file_extension: str) -> str:
1511
  return result.strip()
1512
  except Exception as e:
1513
  logger.error(f"Gemini document parsing error: {e}")
1514
- import traceback
1515
- logger.debug(traceback.format_exc())
1516
  return ""
1517
 
1518
  def extract_text_from_document(file):
@@ -1812,10 +1557,8 @@ def stream_chat(
1812
  logger.info(f"📊 Found {len(web_results)} web search results, now summarizing with Gemini MCP...")
1813
  web_summary = summarize_web_content(web_results, message)
1814
  if web_summary and len(web_summary) > 50: # Check if we got a real summary
1815
- logger.info(f"✅ [MCP] Gemini MCP summarization successful ({len(web_summary)} chars)")
1816
  web_context = f"\n\nAdditional Web Sources (summarized with Gemini MCP):\n{web_summary}"
1817
  else:
1818
- logger.warning("⚠️ [MCP] Gemini MCP summarization failed or returned empty, using raw results")
1819
  # Fallback: use first result's content
1820
  web_context = f"\n\nAdditional Web Sources:\n{web_results[0].get('content', '')[:500]}"
1821
  web_sources = [r['title'] for r in web_results[:3]]
 
34
  from tqdm import tqdm
35
  from langdetect import detect, LangDetectException
36
  os.environ["TOKENIZERS_PARALLELISM"] = "false"
37
+ # Set logging to INFO level for cleaner output
38
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
39
  logger = logging.getLogger(__name__)
40
+ # Set MCP client logging to WARNING to reduce noise
41
  mcp_client_logger = logging.getLogger("mcp.client")
42
+ mcp_client_logger.setLevel(logging.WARNING)
43
  hf_logging.set_verbosity_error()
44
 
45
  # MCP imports
 
53
  except ImportError:
54
  pass # nest_asyncio is optional
55
 
 
 
 
 
 
 
 
 
56
  MCP_AVAILABLE = True
57
  except ImportError as e:
58
  logger.warning(f"MCP SDK not available: {e}")
 
229
 
230
  # Check if session exists and is still valid
231
  if global_mcp_session is not None:
232
+ # Trust that existing session is valid - verify only when actually using it
233
+ return global_mcp_session
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
 
235
  # Create new session using correct MCP SDK pattern
236
  try:
 
238
  mcp_env = os.environ.copy()
239
  if GEMINI_API_KEY:
240
  mcp_env["GEMINI_API_KEY"] = GEMINI_API_KEY
 
241
  else:
242
  logger.warning("GEMINI_API_KEY not set in environment. Gemini MCP features may not work.")
243
 
244
  # Add other Gemini MCP configuration if set
245
  if os.environ.get("GEMINI_MODEL"):
246
  mcp_env["GEMINI_MODEL"] = os.environ.get("GEMINI_MODEL")
 
247
  if os.environ.get("GEMINI_TIMEOUT"):
248
  mcp_env["GEMINI_TIMEOUT"] = os.environ.get("GEMINI_TIMEOUT")
249
  if os.environ.get("GEMINI_MAX_OUTPUT_TOKENS"):
 
251
  if os.environ.get("GEMINI_TEMPERATURE"):
252
  mcp_env["GEMINI_TEMPERATURE"] = os.environ.get("GEMINI_TEMPERATURE")
253
 
254
+ logger.info("Creating MCP client session...")
 
255
 
256
  server_params = StdioServerParameters(
257
  command=MCP_SERVER_COMMAND,
 
259
  env=mcp_env
260
  )
261
 
 
262
  # Correct MCP SDK usage: stdio_client is an async context manager
263
  # that yields (read, write) streams
264
  stdio_ctx = stdio_client(server_params)
 
265
  read, write = await stdio_ctx.__aenter__()
 
266
 
267
  # Wait for the server process to start and be ready
268
+ await asyncio.sleep(1.5) # Wait for server startup
 
269
 
270
  # Create ClientSession from the streams
 
 
 
 
 
 
 
 
 
 
 
271
  session = ClientSession(read, write)
 
272
 
273
  # Initialize the session (this sends initialize request and waits for response + initialized notification)
274
+ # The __aenter__() method handles the complete initialization handshake:
275
+ # 1. Sends initialize request with client info
276
+ # 2. Waits for initialize response from server
277
+ # 3. Waits for initialized notification from server (this is critical!)
278
+ # According to MCP protocol spec, the client MUST wait for the initialized notification
279
+ # before sending any other requests (like list_tools)
280
  try:
281
+ await session.__aenter__()
282
+ logger.info("✅ MCP session initialized")
283
+
284
+ # After __aenter__() completes, wait a bit longer to ensure server's internal state is ready
285
+ # The server may have sent the initialized notification but needs time to set up handlers
286
+ await asyncio.sleep(1.0) # Give server time to finalize internal state
 
 
 
 
 
287
  except Exception as e:
288
  error_msg = str(e)
289
  error_type = type(e).__name__
290
  logger.error(f"❌ MCP session initialization failed: {error_type}: {error_msg}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
291
 
292
  # Clean up and return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
293
  try:
294
  await stdio_ctx.__aexit__(None, None, None)
295
  except:
 
299
  # Store both the session and stdio context to keep them alive
300
  global_mcp_session = session
301
  global_mcp_stdio_ctx = stdio_ctx
302
+ logger.info("✅ MCP client session created successfully")
303
  return session
304
  except Exception as e:
305
  error_type = type(e).__name__
306
  error_msg = str(e)
307
  logger.error(f"❌ Failed to create MCP client session: {error_type}: {error_msg}")
 
 
308
  global_mcp_session = None
309
  global_mcp_stdio_ctx = None
310
  return None
 
321
  logger.warning("Failed to get MCP session for Gemini call")
322
  return ""
323
 
324
+ # List tools - session should be ready after proper initialization
325
+ try:
326
+ tools = await session.list_tools()
327
+ except Exception as e:
328
+ logger.error(f"❌ Failed to list MCP tools: {e}")
329
+ return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
330
 
331
  if not tools or not hasattr(tools, 'tools'):
332
  logger.error("Invalid tools response from MCP server")
 
357
  if temperature is not None:
358
  arguments["temperature"] = temperature
359
 
 
 
360
  result = await session.call_tool(generate_tool.name, arguments=arguments)
361
 
362
  # Parse result
 
364
  for item in result.content:
365
  if hasattr(item, 'text'):
366
  response_text = item.text.strip()
 
367
  return response_text
368
+ logger.warning("⚠️ Gemini MCP returned empty or invalid result")
369
  return ""
370
  except Exception as e:
371
  logger.error(f"Gemini MCP call error: {e}")
 
 
372
  return ""
373
 
374
  def initialize_medical_model(model_name: str):
 
437
  return result.strip()
438
  except Exception as e:
439
  logger.error(f"Gemini transcription error: {e}")
 
 
440
  return ""
441
 
442
  def transcribe_audio(audio):
 
525
  return tmp_file.name
526
  return None
527
  except Exception as e:
528
+ logger.warning(f"MCP TTS error: {e}")
529
  return None
530
 
531
  def generate_speech(text: str):
 
549
  else:
550
  audio_path = loop.run_until_complete(generate_speech_mcp(text))
551
  if audio_path:
 
552
  return audio_path
553
  except Exception as e:
554
+ pass # MCP TTS not available, fallback to local
555
 
556
  # Fallback to local TTS model
557
  if not TTS_AVAILABLE:
 
693
  if session is None:
694
  return []
695
 
696
+ # List tools - session should be ready after proper initialization
697
+ try:
698
+ tools = await session.list_tools()
699
+ except Exception as e:
700
+ logger.error(f"Failed to list MCP tools: {e}")
701
+ return []
 
 
 
 
 
 
 
702
 
703
  if not tools or not hasattr(tools, 'tools'):
704
  return []
 
714
 
715
  if search_tool:
716
  try:
 
717
  # Call the search tool
718
  result = await session.call_tool(
719
  search_tool.name,
 
757
  })
758
 
759
  if web_content:
 
760
  return web_content
761
  except Exception as e:
762
  logger.error(f"Error calling web search MCP tool: {e}")
 
883
 
884
  async def summarize_web_content_gemini(content_list: list, query: str) -> str:
885
  """Summarize web search results using Gemini MCP"""
 
886
  combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
887
 
888
  user_prompt = f"""Summarize the following web search results related to the query: "{query}"
 
901
  temperature=0.5
902
  )
903
 
 
 
 
 
 
904
  return result.strip()
905
 
906
  def summarize_web_content(content_list: list, query: str) -> str:
 
954
 
955
  async def autonomous_reasoning_gemini(query: str) -> dict:
956
  """Autonomous reasoning using Gemini MCP"""
 
957
  reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
958
  Query: "{query}"
959
  Analyze:
 
1025
  }
1026
 
1027
  try:
 
1028
  loop = asyncio.get_event_loop()
1029
  if loop.is_running():
1030
  try:
1031
  import nest_asyncio
1032
  reasoning = nest_asyncio.run(autonomous_reasoning_gemini(query))
1033
+ return reasoning
 
 
 
 
 
1034
  except Exception as e:
1035
+ logger.error(f"Error in nested async reasoning: {e}")
 
 
1036
  else:
1037
  reasoning = loop.run_until_complete(autonomous_reasoning_gemini(query))
1038
+ return reasoning
 
 
 
 
 
1039
  except Exception as e:
1040
+ logger.error(f"Gemini MCP reasoning error: {e}")
 
 
1041
 
1042
  # Fallback reasoning only if all attempts failed
1043
  logger.warning("⚠️ Falling back to default reasoning")
 
1258
  return result.strip()
1259
  except Exception as e:
1260
  logger.error(f"Gemini document parsing error: {e}")
 
 
1261
  return ""
1262
 
1263
  def extract_text_from_document(file):
 
1557
  logger.info(f"📊 Found {len(web_results)} web search results, now summarizing with Gemini MCP...")
1558
  web_summary = summarize_web_content(web_results, message)
1559
  if web_summary and len(web_summary) > 50: # Check if we got a real summary
 
1560
  web_context = f"\n\nAdditional Web Sources (summarized with Gemini MCP):\n{web_summary}"
1561
  else:
 
1562
  # Fallback: use first result's content
1563
  web_context = f"\n\nAdditional Web Sources:\n{web_results[0].get('content', '')[:500]}"
1564
  web_sources = [r['title'] for r in web_results[:3]]