File size: 27,832 Bytes
40374f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5096447
 
 
 
1034c81
5096447
4f116ec
5096447
84f64fc
 
 
40374f9
 
 
15dc1af
b843268
40374f9
15dc1af
40374f9
 
e6bba1f
 
40374f9
3d6d107
40374f9
 
3d6d107
7cf238e
3d6d107
7cf238e
3d6d107
7cf238e
40374f9
 
 
 
 
 
b843268
 
40374f9
 
 
 
 
 
 
 
 
 
5096447
1fc52ea
40374f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5096447
40374f9
 
abad335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40374f9
abad335
 
 
 
 
 
 
 
40374f9
 
abad335
 
5096447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e6bba1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
abad335
 
 
 
3d6d107
abad335
40374f9
5096447
40374f9
 
6ab08df
40374f9
 
 
 
6ab08df
40374f9
 
 
 
 
 
 
b843268
 
 
 
40374f9
b843268
40374f9
b843268
40374f9
0e45c9f
 
 
 
 
 
 
 
40374f9
0e45c9f
 
 
 
 
 
 
 
 
 
 
 
 
 
40374f9
0e45c9f
40374f9
eaec621
 
 
 
 
 
7a7ea02
 
 
 
6ab08df
 
b843268
eaec621
 
7a7ea02
 
 
 
 
 
 
 
 
 
 
 
eaec621
7a7ea02
6ab08df
 
 
 
 
40374f9
 
b843268
40374f9
eaec621
3115184
eaec621
 
 
 
 
 
 
 
 
 
 
 
40374f9
eaec621
40374f9
 
6ab08df
 
 
7a7ea02
 
 
40374f9
6ab08df
 
 
40374f9
 
 
 
 
5096447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e6bba1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40374f9
 
 
 
 
abad335
40374f9
 
 
 
abad335
40374f9
abad335
7cf238e
abad335
7cf238e
 
5096447
 
7cf238e
4f116ec
 
5096447
1034c81
 
5096447
1034c81
 
 
4f116ec
1034c81
4f116ec
 
1034c81
4f116ec
 
 
 
 
 
 
3d6d107
abad335
6c1b819
5096447
abad335
4f116ec
 
abad335
 
5096447
abad335
7cf238e
 
3d6d107
7cf238e
40374f9
 
84f64fc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
#!/usr/bin/env python3
"""
Gemini MCP Server
A Python-based MCP server that provides Gemini AI capabilities via Model Context Protocol.
This server implements the generate_content tool for translation, summarization, document parsing, and transcription.
"""

import os
import sys
import json
import base64
import asyncio
import logging
from typing import Any, Sequence
from pathlib import Path

# MCP imports
try:
    from mcp.server import Server
    from mcp.types import Tool, TextContent
    import mcp.server.stdio
    # Additional imports needed for server functionality
    from mcp import types as mcp_types
    from mcp.types import ImageContent, EmbeddedResource
    from mcp.server.models import InitializationOptions
    from mcp.server import NotificationOptions
except ImportError:
    print("Error: MCP SDK not installed. Install with: pip install mcp", file=sys.stderr)
    sys.exit(1)

# Gemini imports
try:
    from google import genai
    GEMINI_AVAILABLE = True
except ImportError:
    print("Error: google-genai not installed. Install with: pip install google-genai", file=sys.stderr)
    sys.exit(1)

from supervisor import MAX_SEARCH_STRATEGIES

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Set MCP logging to WARNING to reduce noise
mcp_logger = logging.getLogger("mcp")
mcp_logger.setLevel(logging.WARNING)
root_logger = logging.getLogger("root")
root_logger.setLevel(logging.INFO)

# Initialize Gemini
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    logger.error("GEMINI_API_KEY not set in environment variables")
    sys.exit(1)

# Initialize Gemini client
gemini_client = genai.Client(api_key=GEMINI_API_KEY)

# Configuration from environment
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
GEMINI_MODEL_LITE = os.environ.get("GEMINI_MODEL_LITE", "gemini-2.5-flash-lite")
GEMINI_TIMEOUT = int(os.environ.get("GEMINI_TIMEOUT", "300000"))  # milliseconds
GEMINI_MAX_OUTPUT_TOKENS = int(os.environ.get("GEMINI_MAX_OUTPUT_TOKENS", "8192"))
GEMINI_MAX_FILES = int(os.environ.get("GEMINI_MAX_FILES", "10"))
GEMINI_MAX_TOTAL_FILE_SIZE = int(os.environ.get("GEMINI_MAX_TOTAL_FILE_SIZE", "50"))  # MB
GEMINI_TEMPERATURE = float(os.environ.get("GEMINI_TEMPERATURE", "0.2"))

# Initialize MCP server
server = Server("server-mcp-agent")

def decode_base64_file(content: str, mime_type: str = None) -> bytes:
    """Decode base64 encoded file content"""
    try:
        return base64.b64decode(content)
    except Exception as e:
        logger.error(f"Error decoding base64 content: {e}")
        raise

def prepare_gemini_files(files: list) -> list:
    """Prepare files for Gemini API"""
    gemini_parts = []
    
    for file_obj in files:
        try:
            # Handle file with path
            if "path" in file_obj:
                file_path = file_obj["path"]
                mime_type = file_obj.get("type")
                
                if not os.path.exists(file_path):
                    logger.warning(f"File not found: {file_path}")
                    continue
                
                # Read file
                with open(file_path, 'rb') as f:
                    file_data = f.read()
                
                # Auto-detect MIME type if not provided
                if not mime_type:
                    from mimetypes import guess_type
                    mime_type, _ = guess_type(file_path)
                    if not mime_type:
                        mime_type = "application/octet-stream"
            
            # Handle file with base64 content
            elif "content" in file_obj:
                file_data = decode_base64_file(file_obj["content"])
                mime_type = file_obj.get("type", "application/octet-stream")
            else:
                logger.warning("File object must have either 'path' or 'content'")
                continue
            
            # Add to Gemini parts
            gemini_parts.append({
                "mime_type": mime_type,
                "data": file_data
            })
            
        except Exception as e:
            logger.error(f"Error processing file: {e}")
            continue
    
    return gemini_parts

@server.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools"""
    try:
        tools = [
            Tool(
                name="generate_content",
                description="Generate content using Gemini AI. Supports text generation, translation, summarization, document parsing, and audio transcription.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "user_prompt": {
                            "type": "string",
                            "description": "User prompt for generation (required)"
                        },
                        "system_prompt": {
                            "type": "string",
                            "description": "System prompt to guide AI behavior (optional)"
                        },
                        "files": {
                            "type": "array",
                            "description": "Array of files to include in generation (optional)",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "path": {"type": "string", "description": "Path to file"},
                                    "content": {"type": "string", "description": "Base64 encoded file content"},
                                    "type": {"type": "string", "description": "MIME type (auto-detected from file extension)"}
                                }
                            }
                        },
                        "model": {
                            "type": "string",
                            "description": f"Gemini model to use (default: {GEMINI_MODEL})"
                        },
                        "temperature": {
                            "type": "number",
                            "description": f"Temperature for generation 0-2 (default: {GEMINI_TEMPERATURE})"
                        }
                    },
                    "required": ["user_prompt"]
                }
            ),
            Tool(
                name="transcribe_audio",
                description="Transcribe audio file to text using Gemini AI. Supports various audio formats (WAV, MP3, M4A, etc.).",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "audio_path": {
                            "type": "string",
                            "description": "Path to audio file to transcribe (required)"
                        },
                        "language": {
                            "type": "string",
                            "description": "Language code (optional, defaults to auto-detect)"
                        }
                    },
                    "required": ["audio_path"]
                }
            ),
            Tool(
                name="text_to_speech",
                description="Convert text to speech audio using Gemini AI. Returns path to generated audio file.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "text": {
                            "type": "string",
                            "description": "Text to convert to speech (required)"
                        },
                        "language": {
                            "type": "string",
                            "description": "Language code (optional, defaults to 'en')"
                        },
                        "voice": {
                            "type": "string",
                            "description": "Voice selection (optional)"
                        }
                    },
                    "required": ["text"]
                }
            ),
            Tool(
                name="search_web",
                description="Search the web for information. Returns search results with titles, URLs, and content snippets.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "Search query string (required)"
                        },
                        "max_results": {
                            "type": "integer",
                            "description": "Maximum number of results to return (optional, defaults to 5)",
                            "default": MAX_SEARCH_STRATEGIES
                        }
                    },
                    "required": ["query"]
                }
            )
        ]
        return tools
    except Exception as e:
        logger.error(f"Error in list_tools(): {e}")
        raise

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
    """Handle tool calls"""
    logger.info(f"πŸ”΅ MCP tool call received: {name}")
    if name == "generate_content":
        try:
            user_prompt = arguments.get("user_prompt")
            if not user_prompt:
                logger.error("❌ user_prompt is required but missing")
                return [TextContent(type="text", text="Error: user_prompt is required")]
            
            system_prompt = arguments.get("system_prompt")
            files = arguments.get("files", [])
            model = arguments.get("model", GEMINI_MODEL)
            temperature = float(arguments.get("temperature", GEMINI_TEMPERATURE))
            
            # Prepare content for Gemini API
            # The API accepts contents as a string or list
            # For files, we need to handle them differently
            contents = user_prompt
            
            # If system prompt is provided, prepend it to the user prompt
            if system_prompt:
                contents = f"{system_prompt}\n\n{user_prompt}"
            
            # Prepare content for Gemini API
            # The google-genai API expects contents as a list of parts
            gemini_contents = []
            
            # Add text content as first part
            gemini_contents.append(contents)
            
            # Add file content if provided
            if files:
                try:
                    file_parts = prepare_gemini_files(files)
                    # Convert file parts to the format expected by Gemini API
                    for file_part in file_parts:
                        # The API expects parts with inline_data for binary content
                        gemini_contents.append({
                            "inline_data": {
                                "mime_type": file_part["mime_type"],
                                "data": base64.b64encode(file_part["data"]).decode('utf-8')
                            }
                        })
                    logger.info(f"Added {len(file_parts)} file(s) to Gemini request")
                except Exception as e:
                    logger.warning(f"Error preparing files: {e}, continuing with text only")
            
            # Generate content using Gemini API
            try:
                # Prepare generation config
                generation_config = {
                    "temperature": temperature,
                    "max_output_tokens": GEMINI_MAX_OUTPUT_TOKENS
                }
                
                # Convert timeout from milliseconds to seconds
                # Cap at 18s to leave buffer for client timeout (25s) and communication overhead
                # This ensures server completes before client times out
                timeout_seconds = min(GEMINI_TIMEOUT / 1000.0, 18.0)
                logger.info(f"πŸ”΅ Calling Gemini API with model={model}, timeout={timeout_seconds}s...")
                
                # Use asyncio.to_thread to make the blocking call async
                # The API accepts contents as a list and config as a separate parameter
                def generate_sync():
                    try:
                        logger.debug(f"Calling Gemini API synchronously (model={model})...")
                        result = gemini_client.models.generate_content(
                            model=model,
                            contents=gemini_contents,
                            config=generation_config,
                        )
                        logger.debug("Gemini API synchronous call completed")
                        return result
                    except Exception as sync_error:
                        logger.error(f"Error in synchronous Gemini API call: {type(sync_error).__name__}: {sync_error}")
                        raise
                
                logger.debug(f"Starting async wrapper for Gemini API call (timeout={timeout_seconds}s)...")
                response = await asyncio.wait_for(
                    asyncio.to_thread(generate_sync),
                    timeout=timeout_seconds
                )
                logger.info(f"βœ… Gemini API call completed successfully")
                
                # Extract text from response
                if response and hasattr(response, 'text') and response.text:
                    return [TextContent(type="text", text=response.text)]
                elif response and hasattr(response, 'candidates') and response.candidates:
                    # Try to extract text from candidates if response is a list of candidates
                    text_parts = []
                    for candidate in response.candidates:
                        if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'):
                            for part in candidate.content.parts:
                                if hasattr(part, 'text'):
                                    text_parts.append(part.text)
                    if text_parts:
                        text = ''.join(text_parts)
                        return [TextContent(type="text", text=text)]
                    else:
                        logger.warning("Gemini returned response but no text found")
                        return [TextContent(type="text", text="Error: No text in Gemini response")]
                else:
                    logger.warning("Gemini returned empty response")
                    return [TextContent(type="text", text="Error: No response from Gemini")]
                    
            except asyncio.TimeoutError:
                error_msg = f"Gemini API call timed out after {timeout_seconds}s"
                logger.error(f"❌ {error_msg}")
                logger.error(f"   Model: {model}, Prompt length: {len(user_prompt)} chars")
                logger.error(f"   This may indicate network issues, API rate limiting, or the request is too complex")
                return [TextContent(type="text", text=f"Error: {error_msg}. The request may be too complex or there may be network issues.")]
            except Exception as e:
                logger.error(f"❌ Error generating content: {type(e).__name__}: {e}")
                import traceback
                logger.debug(f"Full traceback: {traceback.format_exc()}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]
                
        except Exception as e:
            logger.error(f"Error in generate_content: {e}")
            return [TextContent(type="text", text=f"Error: {str(e)}")]
    elif name == "transcribe_audio":
        try:
            audio_path = arguments.get("audio_path")
            if not audio_path:
                logger.error("❌ audio_path is required but missing")
                return [TextContent(type="text", text="Error: audio_path is required")]
            
            language = arguments.get("language", "auto")
            
            # Check if file exists
            if not os.path.exists(audio_path):
                logger.error(f"❌ Audio file not found: {audio_path}")
                return [TextContent(type="text", text=f"Error: Audio file not found: {audio_path}")]
            
            # Use Gemini to transcribe audio
            system_prompt = "You are a professional transcription service. Provide accurate, well-formatted transcripts."
            user_prompt = "Please transcribe this audio file. Include speaker identification if multiple speakers are present, and format it with proper punctuation and paragraphs, remove mumble, ignore non-verbal noises."
            
            files = [{"path": os.path.abspath(audio_path)}]
            
            try:
                generation_config = {
                    "temperature": 0.2,
                    "max_output_tokens": GEMINI_MAX_OUTPUT_TOKENS
                }
                
                timeout_seconds = min(GEMINI_TIMEOUT / 1000.0, 20.0)
                logger.info(f"πŸ”΅ Transcribing audio with Gemini API, timeout={timeout_seconds}s...")
                
                gemini_contents = [f"{system_prompt}\n\n{user_prompt}"]
                file_parts = prepare_gemini_files(files)
                for file_part in file_parts:
                    gemini_contents.append({
                        "inline_data": {
                            "mime_type": file_part["mime_type"],
                            "data": base64.b64encode(file_part["data"]).decode('utf-8')
                        }
                    })
                
                def transcribe_sync():
                    return gemini_client.models.generate_content(
                        model=GEMINI_MODEL_LITE,
                        contents=gemini_contents,
                        config=generation_config,
                    )
                
                response = await asyncio.wait_for(
                    asyncio.to_thread(transcribe_sync),
                    timeout=timeout_seconds
                )
                
                logger.info(f"βœ… Audio transcription completed successfully")
                
                if response and hasattr(response, 'text') and response.text:
                    return [TextContent(type="text", text=response.text.strip())]
                elif response and hasattr(response, 'candidates') and response.candidates:
                    text_parts = []
                    for candidate in response.candidates:
                        if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'):
                            for part in candidate.content.parts:
                                if hasattr(part, 'text'):
                                    text_parts.append(part.text)
                    if text_parts:
                        text = ''.join(text_parts).strip()
                        return [TextContent(type="text", text=text)]
                    else:
                        return [TextContent(type="text", text="Error: No text in transcription response")]
                else:
                    return [TextContent(type="text", text="Error: No response from transcription")]
                    
            except asyncio.TimeoutError:
                error_msg = f"Audio transcription timed out"
                logger.error(f"❌ {error_msg}")
                return [TextContent(type="text", text=f"Error: {error_msg}")]
            except Exception as e:
                logger.error(f"❌ Error transcribing audio: {type(e).__name__}: {e}")
                import traceback
                logger.debug(f"Full traceback: {traceback.format_exc()}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]
                
        except Exception as e:
            logger.error(f"Error in transcribe_audio: {e}")
            return [TextContent(type="text", text=f"Error: {str(e)}")]
    elif name == "text_to_speech":
        try:
            text = arguments.get("text")
            if not text:
                logger.error("❌ text is required but missing")
                return [TextContent(type="text", text="Error: text is required")]
            
            language = arguments.get("language", "en")
            
            # Note: Gemini API doesn't directly support TTS audio generation
            # This tool is provided for MCP protocol compliance, but the client
            # should use local TTS models (like maya1) for actual audio generation
            logger.info(f"πŸ”΅ TTS request received for text: {text[:50]}...")
            logger.info("ℹ️ Gemini API doesn't support direct TTS. Client should use local TTS model.")
            
            # Return a signal that client should handle TTS locally
            # The client will interpret this and use its local TTS model
            return [TextContent(type="text", text="USE_LOCAL_TTS")]
                    
        except Exception as e:
            logger.error(f"Error in text_to_speech: {e}")
            return [TextContent(type="text", text=f"Error: {str(e)}")]
    elif name == "search_web":
        try:
            query = arguments.get("query")
            if not query:
                logger.error("❌ query is required but missing")
                return [TextContent(type="text", text="Error: query is required")]
            
            max_results = int(arguments.get("max_results", MAX_SEARCH_STRATEGIES))
            
            # Use DuckDuckGo for web search
            try:
                from ddgs import DDGS
                import requests
                from bs4 import BeautifulSoup
            except ImportError:
                logger.error("DuckDuckGo dependencies not available (ddgs, requests, beautifulsoup4)")
                return [TextContent(type="text", text="Error: Web search dependencies not available")]
            
            logger.info(f"πŸ”΅ Performing web search for: {query[:100]}...")
            
            try:
                with DDGS() as ddgs:
                    results = list(ddgs.text(query, max_results=max_results))
                    web_content = []
                    for result in results:
                        try:
                            url = result.get('href', '')
                            title = result.get('title', '')
                            snippet = result.get('body', '')
                            
                            try:
                                response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'})
                                if response.status_code == 200:
                                    soup = BeautifulSoup(response.content, 'html.parser')
                                    for script in soup(["script", "style"]):
                                        script.decompose()
                                    text = soup.get_text()
                                    lines = (line.strip() for line in text.splitlines())
                                    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
                                    text = ' '.join(chunk for chunk in chunks if chunk)
                                    if len(text) > 1000:
                                        text = text[:1000] + "..."
                                    web_content.append({
                                        'title': title,
                                        'url': url,
                                        'content': snippet + "\n" + text[:500] if text else snippet
                                    })
                                else:
                                    web_content.append({
                                        'title': title,
                                        'url': url,
                                        'content': snippet
                                    })
                            except:
                                web_content.append({
                                    'title': title,
                                    'url': url,
                                    'content': snippet
                                })
                        except Exception as e:
                            logger.error(f"Error processing search result: {e}")
                            continue
                    
                    # Return results as JSON string
                    results_json = json.dumps(web_content, indent=2)
                    logger.info(f"βœ… Web search completed: {len(web_content)} results")
                    return [TextContent(type="text", text=results_json)]
            except Exception as e:
                logger.error(f"❌ Web search error: {type(e).__name__}: {e}")
                import traceback
                logger.debug(f"Full traceback: {traceback.format_exc()}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]
                
        except Exception as e:
            logger.error(f"Error in search_web: {e}")
            return [TextContent(type="text", text=f"Error: {str(e)}")]
    else:
        return [TextContent(type="text", text=f"Unknown tool: {name}")]

async def main():
    """Main entry point"""
    logger.info("=" * 60)
    logger.info("Starting Gemini MCP Server...")
    logger.info(f"Gemini API Key: {'Set' if GEMINI_API_KEY else 'Not Set'}")
    logger.info(f"Default Model: {GEMINI_MODEL}")
    logger.info(f"Default Lite Model: {GEMINI_MODEL_LITE}")
    logger.info("=" * 60)
    
    # Keep logging enabled for debugging
    original_root_level = logging.getLogger("root").level
    logging.getLogger("root").setLevel(logging.INFO)
    
    try:
        # Use stdio_server from mcp.server.stdio
        from mcp.server.stdio import stdio_server
        async with stdio_server() as streams:
            # Prepare server capabilities for initialization
            try:
                if hasattr(server, "get_capabilities"):
                    notification_options = NotificationOptions()
                    experimental_capabilities: dict[str, dict[str, Any]] = {}
                    server_capabilities = server.get_capabilities(
                        notification_options=notification_options,
                        experimental_capabilities=experimental_capabilities,
                    )
                else:
                    server_capabilities = mcp_types.ServerCapabilities()
            except Exception as cap_error:
                logger.warning(f"Failed to gather server capabilities: {cap_error}")
                server_capabilities = mcp_types.ServerCapabilities()

            init_options = InitializationOptions(
                server_name="gemini-mcp-server",
                server_version="1.0.0",
                capabilities=server_capabilities,
            )

            logger.info("MCP server ready")
            try:
                # Run the server - it will automatically handle the initialization handshake
                await server.run(
                    read_stream=streams[0],
                    write_stream=streams[1],
                    initialization_options=init_options,
                )
            except Exception as run_error:
                logger.error(f"Error in server.run(): {run_error}")
                raise
    except Exception as e:
        logging.getLogger("root").setLevel(original_root_level)
        logger.error(f"MCP server fatal error: {type(e).__name__}: {e}")
        raise

if __name__ == "__main__":
    asyncio.run(main())