LiamKhoaLe commited on
Commit
edc8faf
·
1 Parent(s): dadfb77

Enhance multi-step reasoning, challengers and MAC #2

Browse files
Files changed (1) hide show
  1. app.py +261 -225
app.py CHANGED
@@ -1172,6 +1172,135 @@ def autonomous_execution_strategy(reasoning: dict, plan: dict, use_rag: bool, us
1172
 
1173
  return strategy
1174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1175
  async def self_reflection_gemini(answer: str, query: str) -> dict:
1176
  """Self-reflection using Gemini MCP"""
1177
  reflection_prompt = f"""Evaluate this medical answer for quality and completeness:
@@ -1240,138 +1369,6 @@ def self_reflection(answer: str, query: str, reasoning: dict) -> dict:
1240
 
1241
  return {"overall_score": 7, "improvement_suggestions": []}
1242
 
1243
- def _truncate_text(text: str, max_chars: int = 4000) -> str:
1244
- """Utility to keep prompts within model limits."""
1245
- if not text:
1246
- return ""
1247
- text = text.strip()
1248
- if len(text) <= max_chars:
1249
- return text
1250
- return text[:max_chars] + "... (truncated)"
1251
-
1252
- async def enhance_answer_with_gemini_async(
1253
- query: str,
1254
- initial_answer: str,
1255
- rag_context: str = "",
1256
- web_context: str = "",
1257
- reasoning: dict = None,
1258
- reflection: dict = None
1259
- ) -> str:
1260
- """Use Gemini MCP to expand and validate the medical answer with multi-step reasoning."""
1261
- if not MCP_AVAILABLE:
1262
- return ""
1263
-
1264
- evidence_sections = []
1265
- if rag_context:
1266
- evidence_sections.append(f"Document Evidence:\n{_truncate_text(rag_context, 3500)}")
1267
- if web_context:
1268
- evidence_sections.append(f"Web Evidence:\n{_truncate_text(web_context, 3500)}")
1269
- evidence_block = "\n\n".join(evidence_sections) if evidence_sections else "No external evidence provided."
1270
-
1271
- reasoning_summary = ""
1272
- if reasoning:
1273
- reasoning_summary = json.dumps({
1274
- "query_type": reasoning.get("query_type"),
1275
- "complexity": reasoning.get("complexity"),
1276
- "sub_questions": reasoning.get("sub_questions", [])
1277
- }, ensure_ascii=False)
1278
-
1279
- reflection_summary = ""
1280
- if reflection:
1281
- reflection_summary = json.dumps(reflection, ensure_ascii=False)
1282
-
1283
- enhancer_prompt = f"""You are the Gemini MCP Response Enhancer working with a medical specialist model that produced a short draft answer.
1284
- Task: Reason through the evidence, validate every claim, challenge gaps, and then produce a substantially more detailed final answer.
1285
-
1286
- Query:
1287
- {query}
1288
-
1289
- Initial Draft Answer:
1290
- {initial_answer}
1291
-
1292
- Reasoning Summary (optional):
1293
- {reasoning_summary or "None"}
1294
-
1295
- Self-Reflection Feedback (optional):
1296
- {reflection_summary or "None"}
1297
-
1298
- Evidence You Can Rely On:
1299
- {evidence_block}
1300
-
1301
- Process:
1302
- 1. Think step-by-step about what the query truly needs and whether the draft covers it.
1303
- 2. Identify inaccuracies, missing context, or shallow explanations using the evidence.
1304
- 3. Produce an enhanced answer that is longer, clinically thorough, and clearly structured.
1305
- 4. When citing, refer to Document Evidence or Web Evidence explicitly (e.g., [Document], [Web]).
1306
- 5. Keep internal reasoning private; only share the final enhanced answer.
1307
-
1308
- Format:
1309
- - Start with a concise summary paragraph.
1310
- - Follow with detailed sections (Assessment, Supporting Evidence, Recommendations, Monitoring/Risks).
1311
- - Conclude with a short bullet list of key takeaways.
1312
-
1313
- The final answer should be at least 3 paragraphs when evidence exists. Do not mention this instruction."""
1314
-
1315
- enhanced = await call_agent(
1316
- user_prompt=enhancer_prompt,
1317
- system_prompt="You are a diligent medical editor. Deliberate internally, then share only the finalized enhanced answer with structured sections.",
1318
- model=GEMINI_MODEL,
1319
- temperature=0.4
1320
- )
1321
-
1322
- return enhanced.strip() if enhanced else ""
1323
-
1324
- def enhance_answer_with_gemini(
1325
- query: str,
1326
- initial_answer: str,
1327
- rag_context: str = "",
1328
- web_context: str = "",
1329
- reasoning: dict = None,
1330
- reflection: dict = None
1331
- ) -> str:
1332
- """Sync wrapper for the Gemini response enhancer."""
1333
- if not MCP_AVAILABLE or not initial_answer:
1334
- return ""
1335
-
1336
- try:
1337
- loop = asyncio.get_event_loop()
1338
- except RuntimeError:
1339
- loop = asyncio.new_event_loop()
1340
- asyncio.set_event_loop(loop)
1341
-
1342
- try:
1343
- if loop.is_running():
1344
- try:
1345
- import nest_asyncio
1346
- enhanced = nest_asyncio.run(
1347
- enhance_answer_with_gemini_async(
1348
- query=query,
1349
- initial_answer=initial_answer,
1350
- rag_context=rag_context,
1351
- web_context=web_context,
1352
- reasoning=reasoning,
1353
- reflection=reflection
1354
- )
1355
- )
1356
- return enhanced
1357
- except Exception as e:
1358
- logger.error(f"Error in nested async enhancement: {e}")
1359
- return ""
1360
- else:
1361
- return loop.run_until_complete(
1362
- enhance_answer_with_gemini_async(
1363
- query=query,
1364
- initial_answer=initial_answer,
1365
- rag_context=rag_context,
1366
- web_context=web_context,
1367
- reasoning=reasoning,
1368
- reflection=reflection
1369
- )
1370
- )
1371
- except Exception as e:
1372
- logger.error(f"Gemini MCP enhancement error: {e}")
1373
- return ""
1374
-
1375
  async def parse_document_gemini(file_path: str, file_extension: str) -> str:
1376
  """Parse document using Gemini MCP"""
1377
  if not MCP_AVAILABLE:
@@ -1609,12 +1606,23 @@ def stream_chat(
1609
  yield history + [{"role": "assistant", "content": "Session initialization failed. Please refresh the page."}]
1610
  return
1611
 
1612
- chat_start_time = time.time()
 
 
 
 
 
1613
 
1614
  user_id = request.session_hash
1615
  index_dir = f"./{user_id}_index"
1616
  has_rag_index = os.path.exists(index_dir)
1617
 
 
 
 
 
 
 
1618
  # If agentic reasoning is disabled, skip all reasoning/planning and use MedSwin model alone
1619
  if disable_agentic_reasoning:
1620
  logger.info("🚫 Agentic reasoning disabled - using MedSwin model alone")
@@ -1649,6 +1657,23 @@ def stream_chat(
1649
  if execution_strategy["reasoning_override"]:
1650
  reasoning_note = f"\n\n💡 *Autonomous Reasoning: {execution_strategy['rationale']}*"
1651
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1652
  # Detect language and translate if needed (Step 1 of plan)
1653
  original_lang = detect_language(message)
1654
  original_message = message
@@ -1675,58 +1700,77 @@ def stream_chat(
1675
  if not disable_agentic_reasoning and reasoning and reasoning.get("complexity") in ["complex", "multi_faceted"]:
1676
  base_system_prompt += f"\n\nQuery Analysis: This is a {reasoning['complexity']} {reasoning['query_type']} query. Address all sub-questions: {', '.join(reasoning.get('sub_questions', [])[:3])}"
1677
 
 
 
 
 
 
 
 
1678
  # ===== EXECUTION: RAG Retrieval (Step 2) =====
1679
  rag_context = ""
1680
  source_info = ""
1681
  if final_use_rag and has_rag_index:
1682
- embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL, token=HF_TOKEN)
1683
- Settings.embed_model = embed_model
1684
- storage_context = StorageContext.from_defaults(persist_dir=index_dir)
1685
- index = load_index_from_storage(storage_context, settings=Settings)
1686
- base_retriever = index.as_retriever(similarity_top_k=retriever_k)
1687
- auto_merging_retriever = AutoMergingRetriever(
1688
- base_retriever,
1689
- storage_context=storage_context,
1690
- simple_ratio_thresh=merge_threshold,
1691
- verbose=True
1692
- )
1693
- logger.info(f"Query: {message}")
1694
- retrieval_start = time.time()
1695
- merged_nodes = auto_merging_retriever.retrieve(message)
1696
- logger.info(f"Retrieved {len(merged_nodes)} merged nodes in {time.time() - retrieval_start:.2f}s")
1697
- merged_file_sources = {}
1698
- for node in merged_nodes:
1699
- if hasattr(node.node, 'metadata') and 'file_name' in node.node.metadata:
1700
- file_name = node.node.metadata['file_name']
1701
- if file_name not in merged_file_sources:
1702
- merged_file_sources[file_name] = 0
1703
- merged_file_sources[file_name] += 1
1704
- logger.info(f"Merged retrieval file distribution: {merged_file_sources}")
1705
- rag_context = "\n\n".join([n.node.text for n in merged_nodes])
1706
- if merged_file_sources:
1707
- source_info = "\n\nRetrieved information from files: " + ", ".join(merged_file_sources.keys())
 
 
 
 
 
 
1708
 
1709
  # ===== EXECUTION: Web Search (Step 3) =====
1710
  web_context = ""
1711
  web_sources = []
1712
  web_urls = [] # Store URLs for citations
1713
  if final_use_web_search:
1714
- logger.info("🌐 Performing web search (will use Gemini MCP for summarization)...")
1715
- web_results = search_web(message, max_results=5)
1716
- if web_results:
1717
- logger.info(f"📊 Found {len(web_results)} web search results, now summarizing with Gemini MCP...")
1718
- web_summary = summarize_web_content(web_results, message)
1719
- if web_summary and len(web_summary) > 50: # Check if we got a real summary
1720
- web_context = f"\n\nAdditional Web Sources (summarized with Gemini MCP):\n{web_summary}"
1721
- else:
1722
- # Fallback: use first result's content
1723
- web_context = f"\n\nAdditional Web Sources:\n{web_results[0].get('content', '')[:500]}"
1724
- web_sources = [r['title'] for r in web_results[:3]]
1725
- # Extract unique URLs for citations
1726
- web_urls = [r.get('url', '') for r in web_results if r.get('url')]
1727
- logger.info(f"✅ Web search completed: {len(web_results)} results, summarized with Gemini MCP")
1728
  else:
1729
- logger.warning("⚠️ Web search returned no results")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1730
 
1731
  # Build final context
1732
  context_parts = []
@@ -1811,6 +1855,20 @@ def stream_chat(
1811
  MedicalStoppingCriteria(eos_token_id, prompt_length, min_new_tokens=100)
1812
  ])
1813
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1814
  streamer = TextIteratorStreamer(
1815
  medical_tokenizer,
1816
  skip_prompt=True,
@@ -1838,6 +1896,8 @@ def stream_chat(
1838
 
1839
  thread = threading.Thread(target=medical_model_obj.generate, kwargs=generation_kwargs)
1840
  thread.start()
 
 
1841
 
1842
  updated_history = history + [
1843
  {"role": "user", "content": original_message},
@@ -1846,70 +1906,42 @@ def stream_chat(
1846
  yield updated_history
1847
 
1848
  partial_response = ""
1849
- reflection_data = None
1850
- reflection_note = ""
1851
- enhancement_note = ""
1852
  try:
1853
  for new_text in streamer:
1854
  partial_response += new_text
1855
  updated_history[-1]["content"] = partial_response
1856
  yield updated_history
 
 
 
 
 
 
 
 
1857
 
1858
  # ===== SELF-REFLECTION (Step 6) =====
1859
  if not disable_agentic_reasoning and reasoning and reasoning.get("complexity") in ["complex", "multi_faceted"]:
1860
  logger.info("🔍 Performing self-reflection on answer quality...")
1861
- reflection_data = self_reflection(partial_response, message, reasoning)
1862
 
1863
- if reflection_data and (reflection_data.get("overall_score", 10) < 7 or reflection_data.get("improvement_suggestions")):
1864
- reflection_note = f"\n\n---\n**Self-Reflection** (Score: {reflection_data.get('overall_score', 'N/A')}/10)"
1865
- if reflection_data.get("improvement_suggestions"):
1866
- reflection_note += f"\n💡 Suggestions: {', '.join(reflection_data['improvement_suggestions'][:2])}"
 
 
 
1867
 
1868
  # Add reasoning note if autonomous override occurred
1869
- base_response = partial_response
1870
-
1871
- remaining_time = 120 - (time.time() - chat_start_time)
1872
- should_enhance = (
1873
- not disable_agentic_reasoning and
1874
- (
1875
- final_use_rag or
1876
- final_use_web_search or
1877
- (reasoning is not None)
1878
- )
1879
- )
1880
- if remaining_time <= 15:
1881
- logger.warning("⏱️ Skipping Gemini enhancement to stay within max duration.")
1882
- should_enhance = False
1883
-
1884
- if should_enhance:
1885
- logger.info("🧠 Launching Gemini MCP response enhancer for extended answer...")
1886
- enhanced_text = enhance_answer_with_gemini(
1887
- query=message,
1888
- initial_answer=base_response,
1889
- rag_context=rag_context,
1890
- web_context=web_context,
1891
- reasoning=reasoning,
1892
- reflection=reflection_data
1893
- )
1894
- if enhanced_text:
1895
- partial_response = enhanced_text
1896
- enhancement_note = "\n\n_Gemini MCP multi-step reasoning enhancer applied._"
1897
-
1898
- if reflection_data:
1899
- partial_response += f"\n\n_Self-reflection score: {reflection_data.get('overall_score', 'N/A')}/10._"
1900
- else:
1901
- if reflection_note:
1902
- partial_response = base_response + reflection_note
1903
- else:
1904
- partial_response = base_response
1905
- else:
1906
- partial_response = base_response + reflection_note if reflection_note else base_response
1907
-
1908
  if reasoning_note:
1909
- partial_response = reasoning_note + "\n\n" + partial_response
1910
-
1911
- if enhancement_note:
1912
- partial_response += enhancement_note
1913
 
1914
  # Translate back if needed
1915
  if needs_translation and partial_response:
@@ -1932,11 +1964,15 @@ def stream_chat(
1932
  if citation_links:
1933
  citations_text = "\n\n**Sources:** " + ", ".join(citation_links)
1934
 
 
 
 
1935
  # Add speaker icon and citations to assistant message
1936
  speaker_icon = ' 🔊'
1937
  partial_response_with_speaker = partial_response + citations_text + speaker_icon
1938
  updated_history[-1]["content"] = partial_response_with_speaker
1939
 
 
1940
  yield updated_history
1941
 
1942
  except GeneratorExit:
 
1172
 
1173
  return strategy
1174
 
1175
+ async def gemini_supervisor_directives_async(query: str, reasoning: dict, plan: dict, time_elapsed: float, max_duration: int = 120) -> dict:
1176
+ """Request supervisor-style task breakdown from Gemini MCP"""
1177
+ remaining_time = max(15, max_duration - time_elapsed)
1178
+ plan_json = json.dumps(plan)
1179
+ reasoning_json = json.dumps(reasoning)
1180
+
1181
+ prompt = f"""You supervise a MedSwin medical specialist model that has a limited output window (~800 tokens).
1182
+ Break the following medical query into concise sequential tasks so MedSwin can answer step-by-step.
1183
+
1184
+ Query: "{query}"
1185
+ Reasoning Analysis: {reasoning_json}
1186
+ Existing Plan: {plan_json}
1187
+ Time Remaining (soft limit): ~{remaining_time:.1f}s (hard limit {max_duration}s). Avoid more than 3 tasks.
1188
+
1189
+ Return JSON with:
1190
+ {{
1191
+ "overall_strategy": "short summary of approach (<=200 chars)",
1192
+ "tasks": [
1193
+ {{"id": 1, "instruction": "concrete directive for MedSwin", "expected_tokens": 200, "challenge": "optional short challenge to double-check"}},
1194
+ ...
1195
+ ],
1196
+ "fast_track": true/false, # true if remaining_time < 25s
1197
+ "escalation_prompt": "optional single-line reminder to wrap up quickly if time is low"
1198
+ }}
1199
+
1200
+ Ensure tasks reference medical reasoning and are ordered so MedSwin can execute one-by-one."""
1201
+
1202
+ system_prompt = (
1203
+ "You are Gemini MCP supervising a constrained MedSwin model. "
1204
+ "Produce structured JSON that keeps MedSwin focused and concise."
1205
+ )
1206
+
1207
+ response = await call_agent(
1208
+ user_prompt=prompt,
1209
+ system_prompt=system_prompt,
1210
+ model=GEMINI_MODEL,
1211
+ temperature=0.3
1212
+ )
1213
+
1214
+ try:
1215
+ start = response.find('{')
1216
+ end = response.rfind('}') + 1
1217
+ if start >= 0 and end > start:
1218
+ directives = json.loads(response[start:end])
1219
+ else:
1220
+ raise ValueError("Supervisor JSON not found")
1221
+ except Exception as exc:
1222
+ logger.error(f"Supervisor directive parsing failed: {exc}")
1223
+ directives = {
1224
+ "overall_strategy": "Address tasks sequentially with concise clinical bullets.",
1225
+ "tasks": [
1226
+ {"id": 1, "instruction": "Summarize the patient's core question.", "expected_tokens": 120},
1227
+ {"id": 2, "instruction": "List key clinical insights or differential items.", "expected_tokens": 200},
1228
+ {"id": 3, "instruction": "Deliver final guidance and follow-up recommendations.", "expected_tokens": 180},
1229
+ ],
1230
+ "fast_track": remaining_time < 25,
1231
+ "escalation_prompt": "Wrap up immediately if time is almost over."
1232
+ }
1233
+ return directives
1234
+
1235
+ def gemini_supervisor_directives(query: str, reasoning: dict, plan: dict, time_elapsed: float, max_duration: int = 120) -> dict:
1236
+ """Wrapper to obtain supervisor directives synchronously"""
1237
+ if not MCP_AVAILABLE:
1238
+ logger.warning("Gemini MCP unavailable for supervisor directives, using fallback.")
1239
+ return {
1240
+ "overall_strategy": "Follow the internal plan sequentially with concise sections.",
1241
+ "tasks": [
1242
+ {"id": step_idx + 1, "instruction": step.get("description", step.get("action", "")), "expected_tokens": 180}
1243
+ for step_idx, step in enumerate(plan.get("steps", [])[:3])
1244
+ ],
1245
+ "fast_track": False,
1246
+ "escalation_prompt": ""
1247
+ }
1248
+
1249
+ try:
1250
+ loop = asyncio.get_event_loop()
1251
+ if loop.is_running():
1252
+ try:
1253
+ import nest_asyncio
1254
+ return nest_asyncio.run(
1255
+ gemini_supervisor_directives_async(query, reasoning, plan, time_elapsed, max_duration)
1256
+ )
1257
+ except Exception as exc:
1258
+ logger.error(f"Nested supervisor directive execution failed: {exc}")
1259
+ raise
1260
+ return loop.run_until_complete(
1261
+ gemini_supervisor_directives_async(query, reasoning, plan, time_elapsed, max_duration)
1262
+ )
1263
+ except Exception as exc:
1264
+ logger.error(f"Supervisor directive request failed: {exc}")
1265
+ return {
1266
+ "overall_strategy": "Provide a concise clinical answer with numbered mini-sections.",
1267
+ "tasks": [
1268
+ {"id": 1, "instruction": "Clarify the medical problem and relevant context.", "expected_tokens": 150},
1269
+ {"id": 2, "instruction": "Give evidence-based assessment or reasoning.", "expected_tokens": 200},
1270
+ {"id": 3, "instruction": "State actionable guidance and cautions.", "expected_tokens": 150},
1271
+ ],
1272
+ "fast_track": True,
1273
+ "escalation_prompt": "Deliver the final summary immediately if time is nearly done."
1274
+ }
1275
+
1276
+ def format_supervisor_directives_text(directives: dict) -> str:
1277
+ """Convert supervisor directive dict into prompt-friendly text"""
1278
+ if not directives:
1279
+ return ""
1280
+
1281
+ lines = []
1282
+ overall = directives.get("overall_strategy")
1283
+ if overall:
1284
+ lines.append(f"Supervisor Strategy: {overall}")
1285
+
1286
+ tasks = directives.get("tasks") or []
1287
+ for task in tasks:
1288
+ task_id = task.get("id")
1289
+ instruction = task.get("instruction", "").strip()
1290
+ challenge = task.get("challenge", "").strip()
1291
+ expected_tokens = task.get("expected_tokens", 180)
1292
+ if instruction:
1293
+ task_line = f"{task_id}. {instruction} (target ≤{expected_tokens} tokens)"
1294
+ if challenge:
1295
+ task_line += f" | Challenge: {challenge}"
1296
+ lines.append(task_line)
1297
+
1298
+ escalation = directives.get("escalation_prompt")
1299
+ if escalation:
1300
+ lines.append(f"Escalation: {escalation}")
1301
+
1302
+ return "\n".join(lines)
1303
+
1304
  async def self_reflection_gemini(answer: str, query: str) -> dict:
1305
  """Self-reflection using Gemini MCP"""
1306
  reflection_prompt = f"""Evaluate this medical answer for quality and completeness:
 
1369
 
1370
  return {"overall_score": 7, "improvement_suggestions": []}
1371
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1372
  async def parse_document_gemini(file_path: str, file_extension: str) -> str:
1373
  """Parse document using Gemini MCP"""
1374
  if not MCP_AVAILABLE:
 
1606
  yield history + [{"role": "assistant", "content": "Session initialization failed. Please refresh the page."}]
1607
  return
1608
 
1609
+ session_start = time.time()
1610
+ soft_timeout = 100
1611
+ hard_timeout = 118 # stop slightly before HF max duration (120s)
1612
+
1613
+ def elapsed():
1614
+ return time.time() - session_start
1615
 
1616
  user_id = request.session_hash
1617
  index_dir = f"./{user_id}_index"
1618
  has_rag_index = os.path.exists(index_dir)
1619
 
1620
+ supervisor_directives = None
1621
+ supervisor_directives_text = ""
1622
+ supervisor_user_note = ""
1623
+ time_pressure_flag = False
1624
+ time_pressure_message = ""
1625
+
1626
  # If agentic reasoning is disabled, skip all reasoning/planning and use MedSwin model alone
1627
  if disable_agentic_reasoning:
1628
  logger.info("🚫 Agentic reasoning disabled - using MedSwin model alone")
 
1657
  if execution_strategy["reasoning_override"]:
1658
  reasoning_note = f"\n\n💡 *Autonomous Reasoning: {execution_strategy['rationale']}*"
1659
 
1660
+ supervisor_directives = gemini_supervisor_directives(
1661
+ message,
1662
+ reasoning,
1663
+ plan,
1664
+ elapsed(),
1665
+ max_duration=120
1666
+ )
1667
+ supervisor_directives_text = format_supervisor_directives_text(supervisor_directives)
1668
+ if supervisor_directives_text:
1669
+ supervisor_user_note = f"🧭 Gemini Supervisor Tasks:\n{supervisor_directives_text}"
1670
+
1671
+ if supervisor_directives.get("fast_track"):
1672
+ logger.info("⚡ Supervisor requested fast-track execution to respect time budget.")
1673
+ final_use_web_search = False # Skip optional web search when supervisor requests fast track
1674
+ reasoning_note += "\n\n⚡ *Supervisor: Fast-track requested due to limited time. Prioritize concise synthesis.*"
1675
+
1676
+
1677
  # Detect language and translate if needed (Step 1 of plan)
1678
  original_lang = detect_language(message)
1679
  original_message = message
 
1700
  if not disable_agentic_reasoning and reasoning and reasoning.get("complexity") in ["complex", "multi_faceted"]:
1701
  base_system_prompt += f"\n\nQuery Analysis: This is a {reasoning['complexity']} {reasoning['query_type']} query. Address all sub-questions: {', '.join(reasoning.get('sub_questions', [])[:3])}"
1702
 
1703
+ if supervisor_directives_text:
1704
+ base_system_prompt += (
1705
+ f"\n\nGemini Supervisor Directives:\n{supervisor_directives_text}\n"
1706
+ "Execute the tasks one-by-one, keeping each section within the suggested token budget. "
1707
+ "If time becomes tight, summarize remaining insights immediately."
1708
+ )
1709
+
1710
  # ===== EXECUTION: RAG Retrieval (Step 2) =====
1711
  rag_context = ""
1712
  source_info = ""
1713
  if final_use_rag and has_rag_index:
1714
+ if elapsed() >= soft_timeout - 10:
1715
+ logger.warning("⏱️ Skipping RAG retrieval due to time pressure.")
1716
+ time_pressure_flag = True
1717
+ time_pressure_message = "Skipped some retrieval steps to finish within the time limit."
1718
+ final_use_rag = False
1719
+ else:
1720
+ embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL, token=HF_TOKEN)
1721
+ Settings.embed_model = embed_model
1722
+ storage_context = StorageContext.from_defaults(persist_dir=index_dir)
1723
+ index = load_index_from_storage(storage_context, settings=Settings)
1724
+ base_retriever = index.as_retriever(similarity_top_k=retriever_k)
1725
+ auto_merging_retriever = AutoMergingRetriever(
1726
+ base_retriever,
1727
+ storage_context=storage_context,
1728
+ simple_ratio_thresh=merge_threshold,
1729
+ verbose=True
1730
+ )
1731
+ logger.info(f"Query: {message}")
1732
+ retrieval_start = time.time()
1733
+ merged_nodes = auto_merging_retriever.retrieve(message)
1734
+ logger.info(f"Retrieved {len(merged_nodes)} merged nodes in {time.time() - retrieval_start:.2f}s")
1735
+ merged_file_sources = {}
1736
+ for node in merged_nodes:
1737
+ if hasattr(node.node, 'metadata') and 'file_name' in node.node.metadata:
1738
+ file_name = node.node.metadata['file_name']
1739
+ if file_name not in merged_file_sources:
1740
+ merged_file_sources[file_name] = 0
1741
+ merged_file_sources[file_name] += 1
1742
+ logger.info(f"Merged retrieval file distribution: {merged_file_sources}")
1743
+ rag_context = "\n\n".join([n.node.text for n in merged_nodes])
1744
+ if merged_file_sources:
1745
+ source_info = "\n\nRetrieved information from files: " + ", ".join(merged_file_sources.keys())
1746
 
1747
  # ===== EXECUTION: Web Search (Step 3) =====
1748
  web_context = ""
1749
  web_sources = []
1750
  web_urls = [] # Store URLs for citations
1751
  if final_use_web_search:
1752
+ if elapsed() >= soft_timeout - 5:
1753
+ logger.warning("⏱️ Skipping web search to stay within execution window.")
1754
+ time_pressure_flag = True
1755
+ time_pressure_message = "Web search skipped due to time constraints."
1756
+ final_use_web_search = False
 
 
 
 
 
 
 
 
 
1757
  else:
1758
+ logger.info("🌐 Performing web search (will use Gemini MCP for summarization)...")
1759
+ web_results = search_web(message, max_results=5)
1760
+ if web_results:
1761
+ logger.info(f"📊 Found {len(web_results)} web search results, now summarizing with Gemini MCP...")
1762
+ web_summary = summarize_web_content(web_results, message)
1763
+ if web_summary and len(web_summary) > 50: # Check if we got a real summary
1764
+ web_context = f"\n\nAdditional Web Sources (summarized with Gemini MCP):\n{web_summary}"
1765
+ else:
1766
+ # Fallback: use first result's content
1767
+ web_context = f"\n\nAdditional Web Sources:\n{web_results[0].get('content', '')[:500]}"
1768
+ web_sources = [r['title'] for r in web_results[:3]]
1769
+ # Extract unique URLs for citations
1770
+ web_urls = [r.get('url', '') for r in web_results if r.get('url')]
1771
+ logger.info(f"✅ Web search completed: {len(web_results)} results, summarized with Gemini MCP")
1772
+ else:
1773
+ logger.warning("⚠️ Web search returned no results")
1774
 
1775
  # Build final context
1776
  context_parts = []
 
1855
  MedicalStoppingCriteria(eos_token_id, prompt_length, min_new_tokens=100)
1856
  ])
1857
 
1858
+ def monitor_timeout():
1859
+ nonlocal time_pressure_flag, time_pressure_message
1860
+ while not stop_event.is_set():
1861
+ current_elapsed = elapsed()
1862
+ if current_elapsed >= hard_timeout:
1863
+ logger.warning("⏳ Hard timeout reached – stopping generation thread.")
1864
+ if not time_pressure_flag:
1865
+ time_pressure_flag = True
1866
+ if not time_pressure_message:
1867
+ time_pressure_message = "Stopped early to respect the 120s execution window."
1868
+ stop_event.set()
1869
+ break
1870
+ time.sleep(0.5)
1871
+
1872
  streamer = TextIteratorStreamer(
1873
  medical_tokenizer,
1874
  skip_prompt=True,
 
1896
 
1897
  thread = threading.Thread(target=medical_model_obj.generate, kwargs=generation_kwargs)
1898
  thread.start()
1899
+ timeout_thread = threading.Thread(target=monitor_timeout, daemon=True)
1900
+ timeout_thread.start()
1901
 
1902
  updated_history = history + [
1903
  {"role": "user", "content": original_message},
 
1906
  yield updated_history
1907
 
1908
  partial_response = ""
 
 
 
1909
  try:
1910
  for new_text in streamer:
1911
  partial_response += new_text
1912
  updated_history[-1]["content"] = partial_response
1913
  yield updated_history
1914
+
1915
+ if not time_pressure_flag and elapsed() >= soft_timeout:
1916
+ logger.warning("⏱️ Soft timeout reached – finalizing response.")
1917
+ time_pressure_flag = True
1918
+ if not time_pressure_message:
1919
+ time_pressure_message = "Soft timeout reached. Delivering final answer early."
1920
+ stop_event.set()
1921
+ break
1922
 
1923
  # ===== SELF-REFLECTION (Step 6) =====
1924
  if not disable_agentic_reasoning and reasoning and reasoning.get("complexity") in ["complex", "multi_faceted"]:
1925
  logger.info("🔍 Performing self-reflection on answer quality...")
1926
+ reflection = self_reflection(partial_response, message, reasoning)
1927
 
1928
+ # Add reflection note if score is low or improvements suggested
1929
+ if reflection.get("overall_score", 10) < 7 or reflection.get("improvement_suggestions"):
1930
+ reflection_note = f"\n\n---\n**Self-Reflection** (Score: {reflection.get('overall_score', 'N/A')}/10)"
1931
+ if reflection.get("improvement_suggestions"):
1932
+ reflection_note += f"\n💡 Suggestions: {', '.join(reflection['improvement_suggestions'][:2])}"
1933
+ partial_response += reflection_note
1934
+ updated_history[-1]["content"] = partial_response
1935
 
1936
  # Add reasoning note if autonomous override occurred
1937
+ header_sections = []
1938
+ if supervisor_user_note:
1939
+ header_sections.append(supervisor_user_note)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1940
  if reasoning_note:
1941
+ header_sections.append(reasoning_note)
1942
+ if header_sections:
1943
+ partial_response = "\n\n".join(header_sections) + "\n\n" + partial_response
1944
+ updated_history[-1]["content"] = partial_response
1945
 
1946
  # Translate back if needed
1947
  if needs_translation and partial_response:
 
1964
  if citation_links:
1965
  citations_text = "\n\n**Sources:** " + ", ".join(citation_links)
1966
 
1967
+ if time_pressure_flag and time_pressure_message:
1968
+ partial_response += f"\n\n⏱️ {time_pressure_message}"
1969
+
1970
  # Add speaker icon and citations to assistant message
1971
  speaker_icon = ' 🔊'
1972
  partial_response_with_speaker = partial_response + citations_text + speaker_icon
1973
  updated_history[-1]["content"] = partial_response_with_speaker
1974
 
1975
+ stop_event.set() # Ensure timeout monitor thread exits once response is finalized
1976
  yield updated_history
1977
 
1978
  except GeneratorExit: