Y Phung Nguyen commited on
Commit
1fc52ea
·
1 Parent(s): 7a7ea02

Run Gemini in thread to avoid timeout

Browse files
Files changed (9) hide show
  1. agent.py +1 -1
  2. models.py +6 -7
  3. pipeline.py +106 -13
  4. reasoning.py +0 -4
  5. requirements.txt +1 -0
  6. search.py +1 -5
  7. supervisor.py +0 -20
  8. utils.py +1 -4
  9. voice.py +0 -4
agent.py CHANGED
@@ -67,7 +67,7 @@ GEMINI_MAX_TOTAL_FILE_SIZE = int(os.environ.get("GEMINI_MAX_TOTAL_FILE_SIZE", "5
67
  GEMINI_TEMPERATURE = float(os.environ.get("GEMINI_TEMPERATURE", "0.2"))
68
 
69
  # Initialize MCP server
70
- server = Server("gemini-mcp-server")
71
 
72
  def decode_base64_file(content: str, mime_type: str = None) -> bytes:
73
  """Decode base64 encoded file content"""
 
67
  GEMINI_TEMPERATURE = float(os.environ.get("GEMINI_TEMPERATURE", "0.2"))
68
 
69
  # Initialize MCP server
70
+ server = Server("server-mcp-agent")
71
 
72
  def decode_base64_file(content: str, mime_type: str = None) -> bytes:
73
  """Decode base64 encoded file content"""
models.py CHANGED
@@ -18,20 +18,19 @@ except ImportError:
18
  _model_loading_states = {}
19
  _model_loading_lock = threading.Lock()
20
 
21
-
22
  def set_model_loading_state(model_name: str, state: str):
23
  """Set model loading state: 'loading', 'loaded', 'error'"""
24
  with _model_loading_lock:
25
  _model_loading_states[model_name] = state
26
  logger.debug(f"Model {model_name} state set to: {state}")
27
 
28
-
29
  def get_model_loading_state(model_name: str) -> str:
30
  """Get model loading state: 'loading', 'loaded', 'error', or 'unknown'"""
31
  with _model_loading_lock:
32
  return _model_loading_states.get(model_name, "unknown")
33
 
34
-
35
  def is_model_loaded(model_name: str) -> bool:
36
  """Check if model is loaded and ready"""
37
  with _model_loading_lock:
@@ -39,7 +38,7 @@ def is_model_loaded(model_name: str) -> bool:
39
  config.global_medical_models[model_name] is not None and
40
  _model_loading_states.get(model_name) == "loaded")
41
 
42
-
43
  def initialize_medical_model(model_name: str):
44
  """Initialize medical model (MedSwin) - download on demand"""
45
  if model_name not in config.global_medical_models or config.global_medical_models[model_name] is None:
@@ -69,7 +68,7 @@ def initialize_medical_model(model_name: str):
69
  set_model_loading_state(model_name, "loaded")
70
  return config.global_medical_models[model_name], config.global_medical_tokenizers[model_name]
71
 
72
-
73
  def initialize_tts_model():
74
  """Initialize TTS model for text-to-speech"""
75
  if not TTS_AVAILABLE:
@@ -86,7 +85,7 @@ def initialize_tts_model():
86
  config.global_tts_model = None
87
  return config.global_tts_model
88
 
89
-
90
  def get_or_create_embed_model():
91
  """Reuse embedding model to avoid reloading weights each request"""
92
  if config.global_embed_model is None:
@@ -94,7 +93,7 @@ def get_or_create_embed_model():
94
  config.global_embed_model = HuggingFaceEmbedding(model_name=config.EMBEDDING_MODEL, token=config.HF_TOKEN)
95
  return config.global_embed_model
96
 
97
-
98
  def get_llm_for_rag(temperature=0.7, max_new_tokens=256, top_p=0.95, top_k=50):
99
  """Get LLM for RAG indexing (uses medical model)"""
100
  medical_model_obj, medical_tokenizer = initialize_medical_model(config.DEFAULT_MEDICAL_MODEL)
 
18
  _model_loading_states = {}
19
  _model_loading_lock = threading.Lock()
20
 
21
+ @spaces.GPU(max_duration=120)
22
  def set_model_loading_state(model_name: str, state: str):
23
  """Set model loading state: 'loading', 'loaded', 'error'"""
24
  with _model_loading_lock:
25
  _model_loading_states[model_name] = state
26
  logger.debug(f"Model {model_name} state set to: {state}")
27
 
28
+ @spaces.GPU(max_duration=120)
29
  def get_model_loading_state(model_name: str) -> str:
30
  """Get model loading state: 'loading', 'loaded', 'error', or 'unknown'"""
31
  with _model_loading_lock:
32
  return _model_loading_states.get(model_name, "unknown")
33
 
 
34
  def is_model_loaded(model_name: str) -> bool:
35
  """Check if model is loaded and ready"""
36
  with _model_loading_lock:
 
38
  config.global_medical_models[model_name] is not None and
39
  _model_loading_states.get(model_name) == "loaded")
40
 
41
+ @spaces.GPU(max_duration=120)
42
  def initialize_medical_model(model_name: str):
43
  """Initialize medical model (MedSwin) - download on demand"""
44
  if model_name not in config.global_medical_models or config.global_medical_models[model_name] is None:
 
68
  set_model_loading_state(model_name, "loaded")
69
  return config.global_medical_models[model_name], config.global_medical_tokenizers[model_name]
70
 
71
+ @spaces.GPU(max_duration=120)
72
  def initialize_tts_model():
73
  """Initialize TTS model for text-to-speech"""
74
  if not TTS_AVAILABLE:
 
85
  config.global_tts_model = None
86
  return config.global_tts_model
87
 
88
+ @spaces.GPU(max_duration=120)
89
  def get_or_create_embed_model():
90
  """Reuse embedding model to avoid reloading weights each request"""
91
  if config.global_embed_model is None:
 
93
  config.global_embed_model = HuggingFaceEmbedding(model_name=config.EMBEDDING_MODEL, token=config.HF_TOKEN)
94
  return config.global_embed_model
95
 
96
+ @spaces.GPU(max_duration=120)
97
  def get_llm_for_rag(temperature=0.7, max_new_tokens=256, top_p=0.95, top_k=50):
98
  """Get LLM for RAG indexing (uses medical model)"""
99
  medical_model_obj, medical_tokenizer = initialize_medical_model(config.DEFAULT_MEDICAL_MODEL)
pipeline.py CHANGED
@@ -29,6 +29,84 @@ MAX_CLINICAL_QA_ROUNDS = 5
29
  _clinical_intake_sessions = {}
30
  _clinical_intake_lock = threading.Lock()
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
  def _get_clinical_intake_state(session_id: str):
34
  with _clinical_intake_lock:
@@ -56,7 +134,6 @@ def _history_to_text(history: list, limit: int = 6) -> str:
56
  lines.append(f"{role}: {content}")
57
  return "\n".join(lines)
58
 
59
-
60
  def _format_intake_question(question: dict, round_idx: int, max_rounds: int, target_lang: str) -> str:
61
  header = f"🩺 Question for clarity {round_idx}/{max_rounds}"
62
  body = question.get("question") or "Could you share a bit more detail so I can give an accurate answer?"
@@ -214,7 +291,8 @@ def _handle_clinical_answer(session_id: str, answer_text: str):
214
  next_index = idx + 1
215
  reached_round_limit = len(state["answers"]) >= state["max_rounds"]
216
  if reached_round_limit or next_index >= len(questions):
217
- insights = gemini_summarize_clinical_insights(state["base_query"], state["answers"])
 
218
  insights_block = _format_insights_block(insights)
219
  refined_query = _build_refined_query(state["base_query"], insights, insights_block)
220
  transcript = _format_qa_transcript(state["answers"])
@@ -384,7 +462,8 @@ def stream_chat(
384
  clinical_intake_context_block = "\n\n".join([seg for seg in [summary_section, transcript_section] if seg])
385
  else:
386
  history_context = _history_to_text(history)
387
- triage_plan = gemini_clinical_intake_triage(message, history_context, MAX_CLINICAL_QA_ROUNDS)
 
388
  pipeline_diagnostics["clinical_intake"]["reason"] = triage_plan.get("decision_reason", "")
389
  pipeline_diagnostics["clinical_intake"]["plan"] = triage_plan.get("questions", [])
390
  needs_intake = triage_plan.get("needs_additional_info") and triage_plan.get("questions")
@@ -448,7 +527,10 @@ def stream_chat(
448
  }
449
  else:
450
  logger.info("[GEMINI SUPERVISOR] Breaking query into sub-topics...")
451
- breakdown = gemini_supervisor_breakdown(message, final_use_rag, final_use_web_search, elapsed(), max_duration=120)
 
 
 
452
  logger.info(f"[GEMINI SUPERVISOR] Created {len(breakdown.get('sub_topics', []))} sub-topics")
453
 
454
  # Update thoughts after breakdown
@@ -460,7 +542,8 @@ def stream_chat(
460
  if final_use_web_search:
461
  search_stage_start = time.time()
462
  logger.info("[GEMINI SUPERVISOR] Search mode: Creating search strategies...")
463
- search_strategies = gemini_supervisor_search_strategies(message, elapsed())
 
464
 
465
  all_search_results = []
466
  strategy_jobs = []
@@ -552,7 +635,8 @@ def stream_chat(
552
  logger.info(f"[GEMINI SUPERVISOR] Retrieved {len(merged_nodes)} document nodes")
553
 
554
  logger.info("[GEMINI SUPERVISOR] Brainstorming RAG contexts...")
555
- rag_brainstorm = gemini_supervisor_rag_brainstorm(message, retrieved_docs, elapsed())
 
556
  rag_contexts = [ctx.get("context", "") for ctx in rag_brainstorm.get("contexts", [])]
557
  logger.info(f"[GEMINI SUPERVISOR] Created {len(rag_contexts)} RAG contexts")
558
  record_stage("rag_retrieval", rag_stage_start)
@@ -630,7 +714,10 @@ def stream_chat(
630
  logger.info("[GEMINI SUPERVISOR] Synthesizing final answer from all MedSwin responses...")
631
  raw_medswin_answers = [ans.split('\n\n', 1)[1] if '\n\n' in ans else ans for ans in medswin_answers]
632
  synthesis_stage_start = time.time()
633
- final_answer = gemini_supervisor_synthesize(message, raw_medswin_answers, rag_contexts, search_contexts, breakdown)
 
 
 
634
  record_stage("synthesis", synthesis_stage_start)
635
 
636
  if not final_answer or len(final_answer.strip()) < 50:
@@ -658,7 +745,10 @@ def stream_chat(
658
  challenge_iteration += 1
659
  logger.info(f"[GEMINI SUPERVISOR] Challenge iteration {challenge_iteration}/{max_challenge_iterations}...")
660
 
661
- evaluation = gemini_supervisor_challenge(message, final_answer, raw_medswin_answers, rag_contexts, search_contexts)
 
 
 
662
 
663
  if evaluation.get("is_optimal", False):
664
  logger.info(f"[GEMINI SUPERVISOR] Answer confirmed optimal after {challenge_iteration} iteration(s)")
@@ -670,8 +760,9 @@ def stream_chat(
670
  break
671
 
672
  logger.info(f"[GEMINI SUPERVISOR] Enhancing answer based on feedback...")
673
- enhanced_answer = gemini_supervisor_enhance_answer(
674
- message, final_answer, enhancement_instructions, raw_medswin_answers, rag_contexts, search_contexts
 
675
  )
676
 
677
  if enhanced_answer and len(enhanced_answer.strip()) > len(final_answer.strip()) * 0.8:
@@ -685,7 +776,8 @@ def stream_chat(
685
  if final_use_web_search and elapsed() < soft_timeout - 10:
686
  logger.info("[GEMINI SUPERVISOR] Checking if additional search is needed...")
687
  clarity_stage_start = time.time()
688
- clarity_check = gemini_supervisor_check_clarity(message, final_answer, final_use_web_search)
 
689
  record_stage("clarity_check", clarity_stage_start)
690
 
691
  if clarity_check.get("needs_search", False) and clarity_check.get("search_queries"):
@@ -715,8 +807,9 @@ def stream_chat(
715
  if additional_summary:
716
  search_contexts.append(additional_summary)
717
  logger.info("[GEMINI SUPERVISOR] Enhancing answer with additional search context...")
718
- enhanced_with_search = gemini_supervisor_enhance_answer(
719
- message, final_answer,
 
720
  f"Incorporate the following additional information from web search: {additional_summary}",
721
  raw_medswin_answers, rag_contexts, search_contexts
722
  )
 
29
  _clinical_intake_sessions = {}
30
  _clinical_intake_lock = threading.Lock()
31
 
32
+ # Thread pool executor for running Gemini supervisor calls without blocking GPU task
33
+ _gemini_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="gemini-supervisor")
34
+
35
+
36
+ def run_gemini_in_thread(fn, *args, **kwargs):
37
+ """
38
+ Run Gemini supervisor function in a separate thread to avoid blocking GPU task.
39
+ This ensures Gemini API calls don't consume GPU task time and cause timeouts.
40
+ """
41
+ try:
42
+ future = _gemini_executor.submit(fn, *args, **kwargs)
43
+ # Set a reasonable timeout (30s) to prevent hanging
44
+ result = future.result(timeout=30.0)
45
+ return result
46
+ except concurrent.futures.TimeoutError:
47
+ logger.error(f"[GEMINI SUPERVISOR] Function {fn.__name__} timed out after 30s")
48
+ # Return fallback based on function
49
+ if "breakdown" in fn.__name__:
50
+ return {
51
+ "sub_topics": [
52
+ {"id": 1, "topic": "Answer", "instruction": args[0] if args else "Address the question", "expected_tokens": 400, "priority": "high", "approach": "direct answer"}
53
+ ],
54
+ "strategy": "Direct answer (timeout fallback)",
55
+ "exploration_note": "Gemini supervisor timeout"
56
+ }
57
+ elif "search_strategies" in fn.__name__:
58
+ return {
59
+ "search_strategies": [
60
+ {"id": 1, "strategy": args[0] if args else "", "target_sources": 2, "focus": "main query"}
61
+ ],
62
+ "max_strategies": 1
63
+ }
64
+ elif "rag_brainstorm" in fn.__name__:
65
+ return {
66
+ "contexts": [
67
+ {"id": 1, "context": args[1][:500] if len(args) > 1 else "", "focus": "retrieved information", "relevance": "high"}
68
+ ],
69
+ "max_contexts": 1
70
+ }
71
+ elif "synthesize" in fn.__name__:
72
+ return "\n\n".join(args[1] if len(args) > 1 else [])
73
+ elif "challenge" in fn.__name__:
74
+ return {"is_optimal": True, "completeness_score": 7, "accuracy_score": 7, "clarity_score": 7, "missing_aspects": [], "inaccuracies": [], "improvement_suggestions": [], "needs_more_context": False, "enhancement_instructions": ""}
75
+ elif "enhance_answer" in fn.__name__:
76
+ return args[1] if len(args) > 1 else ""
77
+ elif "check_clarity" in fn.__name__:
78
+ return {"is_unclear": False, "needs_search": False, "search_queries": []}
79
+ elif "clinical_intake_triage" in fn.__name__:
80
+ return {
81
+ "needs_additional_info": False,
82
+ "decision_reason": "Timeout fallback",
83
+ "max_rounds": args[2] if len(args) > 2 else 5,
84
+ "questions": [],
85
+ "initial_hypotheses": []
86
+ }
87
+ elif "summarize_clinical_insights" in fn.__name__:
88
+ return {
89
+ "patient_profile": "",
90
+ "refined_problem_statement": args[0] if args else "",
91
+ "key_findings": [],
92
+ "handoff_note": "Proceed with regular workflow."
93
+ }
94
+ else:
95
+ logger.warning(f"[GEMINI SUPERVISOR] Unknown function {fn.__name__}, returning None")
96
+ return None
97
+ except Exception as e:
98
+ logger.error(f"[GEMINI SUPERVISOR] Error running {fn.__name__} in thread: {e}")
99
+ # Return appropriate fallback
100
+ if "breakdown" in fn.__name__:
101
+ return {
102
+ "sub_topics": [
103
+ {"id": 1, "topic": "Answer", "instruction": args[0] if args else "Address the question", "expected_tokens": 400, "priority": "high", "approach": "direct answer"}
104
+ ],
105
+ "strategy": "Direct answer (error fallback)",
106
+ "exploration_note": "Gemini supervisor error"
107
+ }
108
+ return None
109
+
110
 
111
  def _get_clinical_intake_state(session_id: str):
112
  with _clinical_intake_lock:
 
134
  lines.append(f"{role}: {content}")
135
  return "\n".join(lines)
136
 
 
137
  def _format_intake_question(question: dict, round_idx: int, max_rounds: int, target_lang: str) -> str:
138
  header = f"🩺 Question for clarity {round_idx}/{max_rounds}"
139
  body = question.get("question") or "Could you share a bit more detail so I can give an accurate answer?"
 
291
  next_index = idx + 1
292
  reached_round_limit = len(state["answers"]) >= state["max_rounds"]
293
  if reached_round_limit or next_index >= len(questions):
294
+ # Run in thread pool to avoid blocking GPU task
295
+ insights = run_gemini_in_thread(gemini_summarize_clinical_insights, state["base_query"], state["answers"])
296
  insights_block = _format_insights_block(insights)
297
  refined_query = _build_refined_query(state["base_query"], insights, insights_block)
298
  transcript = _format_qa_transcript(state["answers"])
 
462
  clinical_intake_context_block = "\n\n".join([seg for seg in [summary_section, transcript_section] if seg])
463
  else:
464
  history_context = _history_to_text(history)
465
+ # Run in thread pool to avoid blocking GPU task
466
+ triage_plan = run_gemini_in_thread(gemini_clinical_intake_triage, message, history_context, MAX_CLINICAL_QA_ROUNDS)
467
  pipeline_diagnostics["clinical_intake"]["reason"] = triage_plan.get("decision_reason", "")
468
  pipeline_diagnostics["clinical_intake"]["plan"] = triage_plan.get("questions", [])
469
  needs_intake = triage_plan.get("needs_additional_info") and triage_plan.get("questions")
 
527
  }
528
  else:
529
  logger.info("[GEMINI SUPERVISOR] Breaking query into sub-topics...")
530
+ # Run in thread pool to avoid blocking GPU task
531
+ breakdown = run_gemini_in_thread(
532
+ gemini_supervisor_breakdown, message, final_use_rag, final_use_web_search, elapsed(), 120
533
+ )
534
  logger.info(f"[GEMINI SUPERVISOR] Created {len(breakdown.get('sub_topics', []))} sub-topics")
535
 
536
  # Update thoughts after breakdown
 
542
  if final_use_web_search:
543
  search_stage_start = time.time()
544
  logger.info("[GEMINI SUPERVISOR] Search mode: Creating search strategies...")
545
+ # Run in thread pool to avoid blocking GPU task
546
+ search_strategies = run_gemini_in_thread(gemini_supervisor_search_strategies, message, elapsed())
547
 
548
  all_search_results = []
549
  strategy_jobs = []
 
635
  logger.info(f"[GEMINI SUPERVISOR] Retrieved {len(merged_nodes)} document nodes")
636
 
637
  logger.info("[GEMINI SUPERVISOR] Brainstorming RAG contexts...")
638
+ # Run in thread pool to avoid blocking GPU task
639
+ rag_brainstorm = run_gemini_in_thread(gemini_supervisor_rag_brainstorm, message, retrieved_docs, elapsed())
640
  rag_contexts = [ctx.get("context", "") for ctx in rag_brainstorm.get("contexts", [])]
641
  logger.info(f"[GEMINI SUPERVISOR] Created {len(rag_contexts)} RAG contexts")
642
  record_stage("rag_retrieval", rag_stage_start)
 
714
  logger.info("[GEMINI SUPERVISOR] Synthesizing final answer from all MedSwin responses...")
715
  raw_medswin_answers = [ans.split('\n\n', 1)[1] if '\n\n' in ans else ans for ans in medswin_answers]
716
  synthesis_stage_start = time.time()
717
+ # Run in thread pool to avoid blocking GPU task
718
+ final_answer = run_gemini_in_thread(
719
+ gemini_supervisor_synthesize, message, raw_medswin_answers, rag_contexts, search_contexts, breakdown
720
+ )
721
  record_stage("synthesis", synthesis_stage_start)
722
 
723
  if not final_answer or len(final_answer.strip()) < 50:
 
745
  challenge_iteration += 1
746
  logger.info(f"[GEMINI SUPERVISOR] Challenge iteration {challenge_iteration}/{max_challenge_iterations}...")
747
 
748
+ # Run in thread pool to avoid blocking GPU task
749
+ evaluation = run_gemini_in_thread(
750
+ gemini_supervisor_challenge, message, final_answer, raw_medswin_answers, rag_contexts, search_contexts
751
+ )
752
 
753
  if evaluation.get("is_optimal", False):
754
  logger.info(f"[GEMINI SUPERVISOR] Answer confirmed optimal after {challenge_iteration} iteration(s)")
 
760
  break
761
 
762
  logger.info(f"[GEMINI SUPERVISOR] Enhancing answer based on feedback...")
763
+ # Run in thread pool to avoid blocking GPU task
764
+ enhanced_answer = run_gemini_in_thread(
765
+ gemini_supervisor_enhance_answer, message, final_answer, enhancement_instructions, raw_medswin_answers, rag_contexts, search_contexts
766
  )
767
 
768
  if enhanced_answer and len(enhanced_answer.strip()) > len(final_answer.strip()) * 0.8:
 
776
  if final_use_web_search and elapsed() < soft_timeout - 10:
777
  logger.info("[GEMINI SUPERVISOR] Checking if additional search is needed...")
778
  clarity_stage_start = time.time()
779
+ # Run in thread pool to avoid blocking GPU task
780
+ clarity_check = run_gemini_in_thread(gemini_supervisor_check_clarity, message, final_answer, final_use_web_search)
781
  record_stage("clarity_check", clarity_stage_start)
782
 
783
  if clarity_check.get("needs_search", False) and clarity_check.get("search_queries"):
 
807
  if additional_summary:
808
  search_contexts.append(additional_summary)
809
  logger.info("[GEMINI SUPERVISOR] Enhancing answer with additional search context...")
810
+ # Run in thread pool to avoid blocking GPU task
811
+ enhanced_with_search = run_gemini_in_thread(
812
+ gemini_supervisor_enhance_answer, message, final_answer,
813
  f"Incorporate the following additional information from web search: {additional_summary}",
814
  raw_medswin_answers, rag_contexts, search_contexts
815
  )
reasoning.py CHANGED
@@ -10,7 +10,6 @@ try:
10
  except ImportError:
11
  nest_asyncio = None
12
 
13
-
14
  async def autonomous_reasoning_gemini(query: str) -> dict:
15
  """Autonomous reasoning using Gemini MCP"""
16
  reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
@@ -75,7 +74,6 @@ Respond in JSON format:
75
  logger.info(f"Reasoning analysis: {reasoning}")
76
  return reasoning
77
 
78
-
79
  def autonomous_reasoning(query: str, history: list) -> dict:
80
  """Autonomous reasoning: Analyze query complexity, intent, and information needs"""
81
  if not MCP_AVAILABLE:
@@ -113,7 +111,6 @@ def autonomous_reasoning(query: str, history: list) -> dict:
113
  "sub_questions": [query]
114
  }
115
 
116
-
117
  def create_execution_plan(reasoning: dict, query: str, has_rag_index: bool) -> dict:
118
  """Planning: Create multi-step execution plan based on reasoning analysis"""
119
  plan = {
@@ -172,7 +169,6 @@ def create_execution_plan(reasoning: dict, query: str, has_rag_index: bool) -> d
172
  logger.info(f"Execution plan created: {len(plan['steps'])} steps")
173
  return plan
174
 
175
-
176
  def autonomous_execution_strategy(reasoning: dict, plan: dict, use_rag: bool, use_web_search: bool, has_rag_index: bool) -> dict:
177
  """Autonomous execution: Make decisions on information gathering strategy"""
178
  strategy = {
 
10
  except ImportError:
11
  nest_asyncio = None
12
 
 
13
  async def autonomous_reasoning_gemini(query: str) -> dict:
14
  """Autonomous reasoning using Gemini MCP"""
15
  reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
 
74
  logger.info(f"Reasoning analysis: {reasoning}")
75
  return reasoning
76
 
 
77
  def autonomous_reasoning(query: str, history: list) -> dict:
78
  """Autonomous reasoning: Analyze query complexity, intent, and information needs"""
79
  if not MCP_AVAILABLE:
 
111
  "sub_questions": [query]
112
  }
113
 
 
114
  def create_execution_plan(reasoning: dict, query: str, has_rag_index: bool) -> dict:
115
  """Planning: Create multi-step execution plan based on reasoning analysis"""
116
  plan = {
 
169
  logger.info(f"Execution plan created: {len(plan['steps'])} steps")
170
  return plan
171
 
 
172
  def autonomous_execution_strategy(reasoning: dict, plan: dict, use_rag: bool, use_web_search: bool, has_rag_index: bool) -> dict:
173
  """Autonomous execution: Make decisions on information gathering strategy"""
174
  strategy = {
requirements.txt CHANGED
@@ -13,6 +13,7 @@ google-genai
13
  langdetect
14
  gradio
15
  gradio[mcp]
 
16
  # MCP dependencies (required for Gemini MCP)
17
  # Install MCP SDK: pip install mcp
18
  # The MCP package provides Model Context Protocol server and client functionality
 
13
  langdetect
14
  gradio
15
  gradio[mcp]
16
+ fastmcp
17
  # MCP dependencies (required for Gemini MCP)
18
  # Install MCP SDK: pip install mcp
19
  # The MCP package provides Model Context Protocol server and client functionality
search.py CHANGED
@@ -97,7 +97,6 @@ async def search_web_mcp_tool(query: str, max_results: int = MAX_SEARCH_STRATEGI
97
  logger.error(f"Web search MCP tool error: {e}")
98
  return []
99
 
100
-
101
  async def search_web_mcp(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
102
  """Search web using MCP tools - tries web search MCP tool first, then falls back to direct search"""
103
  results = await search_web_mcp_tool(query, max_results)
@@ -168,8 +167,7 @@ def search_web_fallback(query: str, max_results: int = MAX_SEARCH_STRATEGIES) ->
168
  logger.error(f"❌ [Direct API] Web search error: {e}")
169
  return []
170
 
171
-
172
- def search_web(query: str, max_results: int = 5) -> list:
173
  """Search web using MCP tools (synchronous wrapper) - prioritizes MCP over direct ddgs"""
174
  if MCP_AVAILABLE:
175
  try:
@@ -200,7 +198,6 @@ def search_web(query: str, max_results: int = 5) -> list:
200
  logger.info("ℹ️ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)")
201
  return search_web_fallback(query, max_results)
202
 
203
-
204
  async def summarize_web_content_gemini(content_list: list, query: str) -> str:
205
  """Summarize web search results using Gemini MCP"""
206
  combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
@@ -222,7 +219,6 @@ Summary:"""
222
 
223
  return result.strip()
224
 
225
-
226
  def summarize_web_content(content_list: list, query: str) -> str:
227
  """Summarize web search results using Gemini MCP"""
228
  if not MCP_AVAILABLE:
 
97
  logger.error(f"Web search MCP tool error: {e}")
98
  return []
99
 
 
100
  async def search_web_mcp(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
101
  """Search web using MCP tools - tries web search MCP tool first, then falls back to direct search"""
102
  results = await search_web_mcp_tool(query, max_results)
 
167
  logger.error(f"❌ [Direct API] Web search error: {e}")
168
  return []
169
 
170
+ def search_web(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list:
 
171
  """Search web using MCP tools (synchronous wrapper) - prioritizes MCP over direct ddgs"""
172
  if MCP_AVAILABLE:
173
  try:
 
198
  logger.info("ℹ️ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)")
199
  return search_web_fallback(query, max_results)
200
 
 
201
  async def summarize_web_content_gemini(content_list: list, query: str) -> str:
202
  """Summarize web search results using Gemini MCP"""
203
  combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]])
 
219
 
220
  return result.strip()
221
 
 
222
  def summarize_web_content(content_list: list, query: str) -> str:
223
  """Summarize web search results using Gemini MCP"""
224
  if not MCP_AVAILABLE:
supervisor.py CHANGED
@@ -18,7 +18,6 @@ try:
18
  except ImportError:
19
  nest_asyncio = None
20
 
21
-
22
  async def gemini_supervisor_breakdown_async(query: str, use_rag: bool, use_web_search: bool, time_elapsed: float, max_duration: int = 120) -> dict:
23
  """Gemini Supervisor: Break user query into sub-topics"""
24
  remaining_time = max(15, max_duration - time_elapsed)
@@ -113,7 +112,6 @@ Guidelines:
113
  logger.warning(f"[GEMINI SUPERVISOR] Using fallback breakdown")
114
  return breakdown
115
 
116
-
117
  async def gemini_supervisor_search_strategies_async(query: str, time_elapsed: float) -> dict:
118
  """Gemini Supervisor: In search mode, break query into searching strategies"""
119
  prompt = f"""You are supervising web search for a medical query.
@@ -164,7 +162,6 @@ Keep strategies focused and avoid overlap."""
164
  "max_strategies": 1
165
  }
166
 
167
-
168
  def _prepare_clinical_question_plan(plan: dict, safe_rounds: int) -> dict:
169
  """Normalize Gemini question plan to 1-5 sequential prompts."""
170
  if not isinstance(plan, dict):
@@ -201,7 +198,6 @@ def _prepare_clinical_question_plan(plan: dict, safe_rounds: int) -> dict:
201
  plan["max_rounds"] = 0
202
  return plan
203
 
204
-
205
  async def gemini_supervisor_rag_brainstorm_async(query: str, retrieved_docs: str, time_elapsed: float) -> dict:
206
  """Gemini Supervisor: In RAG mode, brainstorm retrieved documents into 1-4 short contexts"""
207
  max_doc_length = 3000
@@ -258,7 +254,6 @@ Keep contexts brief and factual. Avoid redundancy."""
258
  "max_contexts": 1
259
  }
260
 
261
-
262
  async def gemini_clinical_intake_triage_async(
263
  query: str,
264
  history_context: str,
@@ -329,7 +324,6 @@ Guidelines:
329
  "initial_hypotheses": []
330
  }
331
 
332
-
333
  def gemini_clinical_intake_triage(
334
  query: str,
335
  history_context: str,
@@ -367,7 +361,6 @@ def gemini_clinical_intake_triage(
367
  "initial_hypotheses": []
368
  }
369
 
370
-
371
  async def gemini_summarize_clinical_insights_async(
372
  query: str,
373
  qa_pairs: list
@@ -427,7 +420,6 @@ Guidelines:
427
  "handoff_note": "Proceed with regular workflow."
428
  }
429
 
430
-
431
  def gemini_summarize_clinical_insights(query: str, qa_pairs: list) -> dict:
432
  """Wrapper for synchronous clinical insight summarization"""
433
  if not MCP_AVAILABLE:
@@ -463,7 +455,6 @@ def gemini_summarize_clinical_insights(query: str, qa_pairs: list) -> dict:
463
  "handoff_note": "Proceed with regular workflow."
464
  }
465
 
466
-
467
  def gemini_supervisor_breakdown(query: str, use_rag: bool, use_web_search: bool, time_elapsed: float, max_duration: int = 120) -> dict:
468
  """Wrapper to obtain supervisor breakdown synchronously"""
469
  if not MCP_AVAILABLE:
@@ -505,7 +496,6 @@ def gemini_supervisor_breakdown(query: str, use_rag: bool, use_web_search: bool,
505
  "exploration_note": "Fallback breakdown - single topic"
506
  }
507
 
508
-
509
  def gemini_supervisor_search_strategies(query: str, time_elapsed: float) -> dict:
510
  """Wrapper to obtain search strategies synchronously"""
511
  if not MCP_AVAILABLE:
@@ -534,7 +524,6 @@ def gemini_supervisor_search_strategies(query: str, time_elapsed: float) -> dict
534
  "max_strategies": 1
535
  }
536
 
537
-
538
  def gemini_supervisor_rag_brainstorm(query: str, retrieved_docs: str, time_elapsed: float) -> dict:
539
  """Wrapper to obtain RAG brainstorm synchronously"""
540
  if not MCP_AVAILABLE:
@@ -666,7 +655,6 @@ def execute_medswin_task(
666
  logger.error(f"[MEDSWIN] Task failed after {attempt + 1} attempts: {e}")
667
  raise
668
 
669
-
670
  async def gemini_supervisor_synthesize_async(query: str, medswin_answers: list, rag_contexts: list, search_contexts: list, breakdown: dict) -> str:
671
  """Gemini Supervisor: Synthesize final answer from all MedSwin responses"""
672
  context_summary = ""
@@ -709,7 +697,6 @@ Return the final synthesized answer in Markdown format. Do not add meta-commenta
709
 
710
  return result.strip()
711
 
712
-
713
  async def gemini_supervisor_challenge_async(query: str, current_answer: str, medswin_answers: list, rag_contexts: list, search_contexts: list) -> dict:
714
  """Gemini Supervisor: Challenge and evaluate the current answer"""
715
  context_info = ""
@@ -785,7 +772,6 @@ Return ONLY valid JSON:
785
  "enhancement_instructions": ""
786
  }
787
 
788
-
789
  async def gemini_supervisor_enhance_answer_async(query: str, current_answer: str, enhancement_instructions: str, medswin_answers: list, rag_contexts: list, search_contexts: list) -> str:
790
  """Gemini Supervisor: Enhance the answer based on challenge feedback"""
791
  context_info = ""
@@ -833,7 +819,6 @@ Return the enhanced answer in Markdown format. Do not add meta-commentary."""
833
 
834
  return result.strip()
835
 
836
-
837
  async def gemini_supervisor_check_clarity_async(query: str, answer: str, use_web_search: bool) -> dict:
838
  """Gemini Supervisor: Check if answer is unclear or supervisor is unsure"""
839
  if not use_web_search:
@@ -884,7 +869,6 @@ Only suggest search if the answer is genuinely unclear or has significant gaps t
884
  logger.error(f"[GEMINI SUPERVISOR] Clarity check parsing failed: {exc}")
885
  return {"is_unclear": False, "needs_search": False, "search_queries": []}
886
 
887
-
888
  def gemini_supervisor_synthesize(query: str, medswin_answers: list, rag_contexts: list, search_contexts: list, breakdown: dict) -> str:
889
  """Wrapper to synthesize answer synchronously"""
890
  if not MCP_AVAILABLE:
@@ -921,7 +905,6 @@ def gemini_supervisor_challenge(query: str, current_answer: str, medswin_answers
921
  logger.error(f"[GEMINI SUPERVISOR] Challenge failed: {exc}")
922
  return {"is_optimal": True, "completeness_score": 7, "accuracy_score": 7, "clarity_score": 7, "missing_aspects": [], "inaccuracies": [], "improvement_suggestions": [], "needs_more_context": False, "enhancement_instructions": ""}
923
 
924
-
925
  def gemini_supervisor_enhance_answer(query: str, current_answer: str, enhancement_instructions: str, medswin_answers: list, rag_contexts: list, search_contexts: list) -> str:
926
  """Wrapper to enhance answer synchronously"""
927
  if not MCP_AVAILABLE:
@@ -939,7 +922,6 @@ def gemini_supervisor_enhance_answer(query: str, current_answer: str, enhancemen
939
  logger.error(f"[GEMINI SUPERVISOR] Enhancement failed: {exc}")
940
  return current_answer
941
 
942
-
943
  def gemini_supervisor_check_clarity(query: str, answer: str, use_web_search: bool) -> dict:
944
  """Wrapper to check clarity synchronously"""
945
  if not MCP_AVAILABLE or not use_web_search:
@@ -957,7 +939,6 @@ def gemini_supervisor_check_clarity(query: str, answer: str, use_web_search: boo
957
  logger.error(f"[GEMINI SUPERVISOR] Clarity check failed: {exc}")
958
  return {"is_unclear": False, "needs_search": False, "search_queries": []}
959
 
960
-
961
  async def self_reflection_gemini(answer: str, query: str) -> dict:
962
  """Self-reflection using Gemini MCP"""
963
  reflection_prompt = f"""Evaluate this medical answer for quality and completeness:
@@ -1001,7 +982,6 @@ Respond in JSON:
1001
  logger.info(f"Self-reflection score: {reflection.get('overall_score', 'N/A')}")
1002
  return reflection
1003
 
1004
-
1005
  def self_reflection(answer: str, query: str, reasoning: dict) -> dict:
1006
  """Self-reflection: Evaluate answer quality and completeness"""
1007
  if not MCP_AVAILABLE:
 
18
  except ImportError:
19
  nest_asyncio = None
20
 
 
21
  async def gemini_supervisor_breakdown_async(query: str, use_rag: bool, use_web_search: bool, time_elapsed: float, max_duration: int = 120) -> dict:
22
  """Gemini Supervisor: Break user query into sub-topics"""
23
  remaining_time = max(15, max_duration - time_elapsed)
 
112
  logger.warning(f"[GEMINI SUPERVISOR] Using fallback breakdown")
113
  return breakdown
114
 
 
115
  async def gemini_supervisor_search_strategies_async(query: str, time_elapsed: float) -> dict:
116
  """Gemini Supervisor: In search mode, break query into searching strategies"""
117
  prompt = f"""You are supervising web search for a medical query.
 
162
  "max_strategies": 1
163
  }
164
 
 
165
  def _prepare_clinical_question_plan(plan: dict, safe_rounds: int) -> dict:
166
  """Normalize Gemini question plan to 1-5 sequential prompts."""
167
  if not isinstance(plan, dict):
 
198
  plan["max_rounds"] = 0
199
  return plan
200
 
 
201
  async def gemini_supervisor_rag_brainstorm_async(query: str, retrieved_docs: str, time_elapsed: float) -> dict:
202
  """Gemini Supervisor: In RAG mode, brainstorm retrieved documents into 1-4 short contexts"""
203
  max_doc_length = 3000
 
254
  "max_contexts": 1
255
  }
256
 
 
257
  async def gemini_clinical_intake_triage_async(
258
  query: str,
259
  history_context: str,
 
324
  "initial_hypotheses": []
325
  }
326
 
 
327
  def gemini_clinical_intake_triage(
328
  query: str,
329
  history_context: str,
 
361
  "initial_hypotheses": []
362
  }
363
 
 
364
  async def gemini_summarize_clinical_insights_async(
365
  query: str,
366
  qa_pairs: list
 
420
  "handoff_note": "Proceed with regular workflow."
421
  }
422
 
 
423
  def gemini_summarize_clinical_insights(query: str, qa_pairs: list) -> dict:
424
  """Wrapper for synchronous clinical insight summarization"""
425
  if not MCP_AVAILABLE:
 
455
  "handoff_note": "Proceed with regular workflow."
456
  }
457
 
 
458
  def gemini_supervisor_breakdown(query: str, use_rag: bool, use_web_search: bool, time_elapsed: float, max_duration: int = 120) -> dict:
459
  """Wrapper to obtain supervisor breakdown synchronously"""
460
  if not MCP_AVAILABLE:
 
496
  "exploration_note": "Fallback breakdown - single topic"
497
  }
498
 
 
499
  def gemini_supervisor_search_strategies(query: str, time_elapsed: float) -> dict:
500
  """Wrapper to obtain search strategies synchronously"""
501
  if not MCP_AVAILABLE:
 
524
  "max_strategies": 1
525
  }
526
 
 
527
  def gemini_supervisor_rag_brainstorm(query: str, retrieved_docs: str, time_elapsed: float) -> dict:
528
  """Wrapper to obtain RAG brainstorm synchronously"""
529
  if not MCP_AVAILABLE:
 
655
  logger.error(f"[MEDSWIN] Task failed after {attempt + 1} attempts: {e}")
656
  raise
657
 
 
658
  async def gemini_supervisor_synthesize_async(query: str, medswin_answers: list, rag_contexts: list, search_contexts: list, breakdown: dict) -> str:
659
  """Gemini Supervisor: Synthesize final answer from all MedSwin responses"""
660
  context_summary = ""
 
697
 
698
  return result.strip()
699
 
 
700
  async def gemini_supervisor_challenge_async(query: str, current_answer: str, medswin_answers: list, rag_contexts: list, search_contexts: list) -> dict:
701
  """Gemini Supervisor: Challenge and evaluate the current answer"""
702
  context_info = ""
 
772
  "enhancement_instructions": ""
773
  }
774
 
 
775
  async def gemini_supervisor_enhance_answer_async(query: str, current_answer: str, enhancement_instructions: str, medswin_answers: list, rag_contexts: list, search_contexts: list) -> str:
776
  """Gemini Supervisor: Enhance the answer based on challenge feedback"""
777
  context_info = ""
 
819
 
820
  return result.strip()
821
 
 
822
  async def gemini_supervisor_check_clarity_async(query: str, answer: str, use_web_search: bool) -> dict:
823
  """Gemini Supervisor: Check if answer is unclear or supervisor is unsure"""
824
  if not use_web_search:
 
869
  logger.error(f"[GEMINI SUPERVISOR] Clarity check parsing failed: {exc}")
870
  return {"is_unclear": False, "needs_search": False, "search_queries": []}
871
 
 
872
  def gemini_supervisor_synthesize(query: str, medswin_answers: list, rag_contexts: list, search_contexts: list, breakdown: dict) -> str:
873
  """Wrapper to synthesize answer synchronously"""
874
  if not MCP_AVAILABLE:
 
905
  logger.error(f"[GEMINI SUPERVISOR] Challenge failed: {exc}")
906
  return {"is_optimal": True, "completeness_score": 7, "accuracy_score": 7, "clarity_score": 7, "missing_aspects": [], "inaccuracies": [], "improvement_suggestions": [], "needs_more_context": False, "enhancement_instructions": ""}
907
 
 
908
  def gemini_supervisor_enhance_answer(query: str, current_answer: str, enhancement_instructions: str, medswin_answers: list, rag_contexts: list, search_contexts: list) -> str:
909
  """Wrapper to enhance answer synchronously"""
910
  if not MCP_AVAILABLE:
 
922
  logger.error(f"[GEMINI SUPERVISOR] Enhancement failed: {exc}")
923
  return current_answer
924
 
 
925
  def gemini_supervisor_check_clarity(query: str, answer: str, use_web_search: bool) -> dict:
926
  """Wrapper to check clarity synchronously"""
927
  if not MCP_AVAILABLE or not use_web_search:
 
939
  logger.error(f"[GEMINI SUPERVISOR] Clarity check failed: {exc}")
940
  return {"is_unclear": False, "needs_search": False, "search_queries": []}
941
 
 
942
  async def self_reflection_gemini(answer: str, query: str) -> dict:
943
  """Self-reflection using Gemini MCP"""
944
  reflection_prompt = f"""Evaluate this medical answer for quality and completeness:
 
982
  logger.info(f"Self-reflection score: {reflection.get('overall_score', 'N/A')}")
983
  return reflection
984
 
 
985
  def self_reflection(answer: str, query: str, reasoning: dict) -> dict:
986
  """Self-reflection: Evaluate answer quality and completeness"""
987
  if not MCP_AVAILABLE:
utils.py CHANGED
@@ -119,7 +119,6 @@ def format_url_as_domain(url: str) -> str:
119
  return domain
120
  return url
121
 
122
-
123
  async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str:
124
  """Translate text using Gemini MCP"""
125
  if source_lang:
@@ -138,7 +137,6 @@ async def translate_text_gemini(text: str, target_lang: str = "en", source_lang:
138
 
139
  return result.strip()
140
 
141
-
142
  def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str:
143
  """Translate text using Gemini MCP"""
144
  if not MCP_AVAILABLE:
@@ -163,5 +161,4 @@ def translate_text(text: str, target_lang: str = "en", source_lang: str = None)
163
  except Exception as e:
164
  logger.error(f"Gemini MCP translation error: {e}")
165
 
166
- return text
167
-
 
119
  return domain
120
  return url
121
 
 
122
  async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str:
123
  """Translate text using Gemini MCP"""
124
  if source_lang:
 
137
 
138
  return result.strip()
139
 
 
140
  def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str:
141
  """Translate text using Gemini MCP"""
142
  if not MCP_AVAILABLE:
 
161
  except Exception as e:
162
  logger.error(f"Gemini MCP translation error: {e}")
163
 
164
+ return text
 
voice.py CHANGED
@@ -13,7 +13,6 @@ try:
13
  except ImportError:
14
  nest_asyncio = None
15
 
16
-
17
  async def transcribe_audio_gemini(audio_path: str) -> str:
18
  """Transcribe audio using Gemini MCP transcribe_audio tool"""
19
  if not MCP_AVAILABLE:
@@ -70,7 +69,6 @@ async def transcribe_audio_gemini(audio_path: str) -> str:
70
  logger.error(f"Gemini transcription error: {e}")
71
  return ""
72
 
73
-
74
  def transcribe_audio(audio):
75
  """Transcribe audio to text using Gemini MCP"""
76
  if audio is None:
@@ -112,7 +110,6 @@ def transcribe_audio(audio):
112
  logger.error(f"Transcription error: {e}")
113
  return ""
114
 
115
-
116
  async def generate_speech_mcp(text: str) -> str:
117
  """Generate speech using MCP text_to_speech tool"""
118
  if not MCP_AVAILABLE:
@@ -166,7 +163,6 @@ async def generate_speech_mcp(text: str) -> str:
166
  logger.warning(f"MCP TTS error: {e}")
167
  return None
168
 
169
-
170
  def generate_speech(text: str):
171
  """Generate speech from text using TTS model (with MCP fallback)"""
172
  if not text or len(text.strip()) == 0:
 
13
  except ImportError:
14
  nest_asyncio = None
15
 
 
16
  async def transcribe_audio_gemini(audio_path: str) -> str:
17
  """Transcribe audio using Gemini MCP transcribe_audio tool"""
18
  if not MCP_AVAILABLE:
 
69
  logger.error(f"Gemini transcription error: {e}")
70
  return ""
71
 
 
72
  def transcribe_audio(audio):
73
  """Transcribe audio to text using Gemini MCP"""
74
  if audio is None:
 
110
  logger.error(f"Transcription error: {e}")
111
  return ""
112
 
 
113
  async def generate_speech_mcp(text: str) -> str:
114
  """Generate speech using MCP text_to_speech tool"""
115
  if not MCP_AVAILABLE:
 
163
  logger.warning(f"MCP TTS error: {e}")
164
  return None
165
 
 
166
  def generate_speech(text: str):
167
  """Generate speech from text using TTS model (with MCP fallback)"""
168
  if not text or len(text.strip()) == 0: