MedLLM-Agent / reasoning.py
Y Phung Nguyen
Fix Gemini MCP error handling and session management
4a43fcc
raw
history blame
6.82 kB
"""Autonomous reasoning and execution planning"""
import json
import asyncio
from logger import logger
from client import MCP_AVAILABLE, call_agent
from config import GEMINI_MODEL
try:
import nest_asyncio
except ImportError:
nest_asyncio = None
async def autonomous_reasoning_gemini(query: str) -> dict:
"""Autonomous reasoning using Gemini MCP"""
reasoning_prompt = f"""Analyze this medical query and provide structured reasoning:
Query: "{query}"
Analyze:
1. Query Type: (diagnosis, treatment, drug_info, symptom_analysis, research, general_info)
2. Complexity: (simple, moderate, complex, multi_faceted)
3. Information Needs: What specific information is required?
4. Requires RAG: (yes/no) - Does this need document context?
5. Requires Web Search: (yes/no) - Does this need current/updated information?
6. Sub-questions: Break down into key sub-questions if complex
Respond in JSON format:
{{
"query_type": "...",
"complexity": "...",
"information_needs": ["..."],
"requires_rag": true/false,
"requires_web_search": true/false,
"sub_questions": ["..."]
}}"""
system_prompt = "You are a medical reasoning system. Analyze queries systematically and provide structured JSON responses."
response = await call_agent(
user_prompt=reasoning_prompt,
system_prompt=system_prompt,
model=GEMINI_MODEL,
temperature=0.3
)
if not response or not response.strip():
logger.warning("Gemini MCP returned empty response for reasoning, using fallback")
reasoning = {
"query_type": "general_info",
"complexity": "moderate",
"information_needs": ["medical information"],
"requires_rag": True,
"requires_web_search": False,
"sub_questions": [query]
}
return reasoning
try:
json_start = response.find('{')
json_end = response.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
reasoning = json.loads(response[json_start:json_end])
else:
raise ValueError("No JSON found in response")
except Exception as e:
logger.error(f"Failed to parse reasoning response as JSON: {e}")
logger.debug(f"Response was: {response[:200]}...")
reasoning = {
"query_type": "general_info",
"complexity": "moderate",
"information_needs": ["medical information"],
"requires_rag": True,
"requires_web_search": False,
"sub_questions": [query]
}
logger.info(f"Reasoning analysis: {reasoning}")
return reasoning
def autonomous_reasoning(query: str, history: list) -> dict:
"""Autonomous reasoning: Analyze query complexity, intent, and information needs"""
if not MCP_AVAILABLE:
logger.info("ℹ️ Gemini MCP not available for reasoning, using fallback (app will continue to work normally)")
return {
"query_type": "general_info",
"complexity": "moderate",
"information_needs": ["medical information"],
"requires_rag": True,
"requires_web_search": False,
"sub_questions": [query]
}
try:
loop = asyncio.get_event_loop()
if loop.is_running():
if nest_asyncio:
reasoning = nest_asyncio.run(autonomous_reasoning_gemini(query))
return reasoning
else:
logger.error("Error in nested async reasoning: nest_asyncio not available")
else:
reasoning = loop.run_until_complete(autonomous_reasoning_gemini(query))
return reasoning
except Exception as e:
logger.error(f"Gemini MCP reasoning error: {e}")
logger.warning("⚠️ Falling back to default reasoning")
return {
"query_type": "general_info",
"complexity": "moderate",
"information_needs": ["medical information"],
"requires_rag": True,
"requires_web_search": False,
"sub_questions": [query]
}
def create_execution_plan(reasoning: dict, query: str, has_rag_index: bool) -> dict:
"""Planning: Create multi-step execution plan based on reasoning analysis"""
plan = {
"steps": [],
"strategy": "sequential",
"iterations": 1
}
if reasoning["complexity"] in ["complex", "multi_faceted"]:
plan["strategy"] = "iterative"
plan["iterations"] = 2
plan["steps"].append({
"step": 1,
"action": "detect_language",
"description": "Detect query language and translate if needed"
})
if reasoning.get("requires_rag", True) and has_rag_index:
plan["steps"].append({
"step": 2,
"action": "rag_retrieval",
"description": "Retrieve relevant document context",
"parameters": {"top_k": 15, "merge_threshold": 0.5}
})
if reasoning.get("requires_web_search", False):
plan["steps"].append({
"step": 3,
"action": "web_search",
"description": "Search web for current/updated information",
"parameters": {"max_results": 5}
})
if reasoning.get("sub_questions") and len(reasoning["sub_questions"]) > 1:
plan["steps"].append({
"step": 4,
"action": "multi_step_reasoning",
"description": "Process sub-questions iteratively",
"sub_questions": reasoning["sub_questions"]
})
plan["steps"].append({
"step": len(plan["steps"]) + 1,
"action": "synthesize_answer",
"description": "Generate comprehensive answer from all sources"
})
if reasoning["complexity"] in ["complex", "multi_faceted"]:
plan["steps"].append({
"step": len(plan["steps"]) + 1,
"action": "self_reflection",
"description": "Evaluate answer quality and completeness"
})
logger.info(f"Execution plan created: {len(plan['steps'])} steps")
return plan
def autonomous_execution_strategy(reasoning: dict, plan: dict, use_rag: bool, use_web_search: bool, has_rag_index: bool) -> dict:
"""Autonomous execution: Make decisions on information gathering strategy"""
strategy = {
"use_rag": use_rag,
"use_web_search": use_web_search,
"reasoning_override": False,
"rationale": ""
}
if reasoning.get("requires_web_search", False) and not use_web_search:
strategy["rationale"] = "Reasoning suggests web search for current information, but the user kept it disabled."
if strategy["rationale"]:
logger.info(f"Autonomous reasoning note: {strategy['rationale']}")
return strategy