File size: 14,558 Bytes
40374f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1034c81
 
84f64fc
4f116ec
84f64fc
 
 
40374f9
 
 
15dc1af
b843268
40374f9
15dc1af
40374f9
 
 
3d6d107
40374f9
 
3d6d107
7cf238e
3d6d107
7cf238e
3d6d107
7cf238e
40374f9
 
 
 
 
 
b843268
 
40374f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
abad335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40374f9
abad335
 
 
 
 
 
 
 
40374f9
 
abad335
 
 
 
 
 
3d6d107
abad335
40374f9
 
 
 
6ab08df
40374f9
 
 
 
6ab08df
40374f9
 
 
 
 
 
 
b843268
 
 
 
40374f9
b843268
40374f9
b843268
40374f9
0e45c9f
 
 
 
 
 
 
 
40374f9
0e45c9f
 
 
 
 
 
 
 
 
 
 
 
 
 
40374f9
0e45c9f
40374f9
eaec621
 
 
 
 
 
fc23c24
 
6ab08df
 
b843268
eaec621
 
d0e0256
 
eaec621
d0e0256
eaec621
 
6ab08df
 
 
 
 
40374f9
 
b843268
40374f9
eaec621
3115184
eaec621
 
 
 
 
 
 
 
 
 
 
 
40374f9
eaec621
40374f9
 
6ab08df
 
 
 
 
40374f9
6ab08df
 
 
40374f9
 
 
 
 
 
 
 
 
 
abad335
40374f9
 
 
 
abad335
40374f9
 
84f64fc
40374f9
abad335
7cf238e
abad335
7cf238e
 
 
4f116ec
 
 
1034c81
 
 
 
 
 
4f116ec
1034c81
4f116ec
 
1034c81
4f116ec
 
 
 
 
 
 
3d6d107
abad335
6c1b819
abad335
 
4f116ec
 
abad335
 
3d6d107
abad335
7cf238e
 
3d6d107
7cf238e
40374f9
 
84f64fc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python3
"""
Gemini MCP Server
A Python-based MCP server that provides Gemini AI capabilities via Model Context Protocol.
This server implements the generate_content tool for translation, summarization, document parsing, and transcription.
"""

import os
import sys
import json
import base64
import asyncio
import logging
from typing import Any, Sequence
from pathlib import Path

# MCP imports
try:
    from mcp import types as mcp_types
    from mcp.server import Server, NotificationOptions
    from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
    from mcp.server.models import InitializationOptions
except ImportError:
    print("Error: MCP SDK not installed. Install with: pip install mcp", file=sys.stderr)
    sys.exit(1)

# Gemini imports
try:
    from google import genai
    GEMINI_AVAILABLE = True
except ImportError:
    print("Error: google-genai not installed. Install with: pip install google-genai", file=sys.stderr)
    sys.exit(1)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Set MCP logging to WARNING to reduce noise
mcp_logger = logging.getLogger("mcp")
mcp_logger.setLevel(logging.WARNING)
root_logger = logging.getLogger("root")
root_logger.setLevel(logging.INFO)

# Initialize Gemini
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    logger.error("GEMINI_API_KEY not set in environment variables")
    sys.exit(1)

# Initialize Gemini client
gemini_client = genai.Client(api_key=GEMINI_API_KEY)

# Configuration from environment
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
GEMINI_MODEL_LITE = os.environ.get("GEMINI_MODEL_LITE", "gemini-2.5-flash-lite")
GEMINI_TIMEOUT = int(os.environ.get("GEMINI_TIMEOUT", "300000"))  # milliseconds
GEMINI_MAX_OUTPUT_TOKENS = int(os.environ.get("GEMINI_MAX_OUTPUT_TOKENS", "8192"))
GEMINI_MAX_FILES = int(os.environ.get("GEMINI_MAX_FILES", "10"))
GEMINI_MAX_TOTAL_FILE_SIZE = int(os.environ.get("GEMINI_MAX_TOTAL_FILE_SIZE", "50"))  # MB
GEMINI_TEMPERATURE = float(os.environ.get("GEMINI_TEMPERATURE", "0.2"))

# Create MCP server
app = Server("gemini-mcp-server")

def decode_base64_file(content: str, mime_type: str = None) -> bytes:
    """Decode base64 encoded file content"""
    try:
        return base64.b64decode(content)
    except Exception as e:
        logger.error(f"Error decoding base64 content: {e}")
        raise

def prepare_gemini_files(files: list) -> list:
    """Prepare files for Gemini API"""
    gemini_parts = []
    
    for file_obj in files:
        try:
            # Handle file with path
            if "path" in file_obj:
                file_path = file_obj["path"]
                mime_type = file_obj.get("type")
                
                if not os.path.exists(file_path):
                    logger.warning(f"File not found: {file_path}")
                    continue
                
                # Read file
                with open(file_path, 'rb') as f:
                    file_data = f.read()
                
                # Auto-detect MIME type if not provided
                if not mime_type:
                    from mimetypes import guess_type
                    mime_type, _ = guess_type(file_path)
                    if not mime_type:
                        mime_type = "application/octet-stream"
            
            # Handle file with base64 content
            elif "content" in file_obj:
                file_data = decode_base64_file(file_obj["content"])
                mime_type = file_obj.get("type", "application/octet-stream")
            else:
                logger.warning("File object must have either 'path' or 'content'")
                continue
            
            # Add to Gemini parts
            gemini_parts.append({
                "mime_type": mime_type,
                "data": file_data
            })
            
        except Exception as e:
            logger.error(f"Error processing file: {e}")
            continue
    
    return gemini_parts

@app.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools"""
    try:
        tools = [
            Tool(
                name="generate_content",
                description="Generate content using Gemini AI. Supports text generation, translation, summarization, document parsing, and audio transcription.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "user_prompt": {
                            "type": "string",
                            "description": "User prompt for generation (required)"
                        },
                        "system_prompt": {
                            "type": "string",
                            "description": "System prompt to guide AI behavior (optional)"
                        },
                        "files": {
                            "type": "array",
                            "description": "Array of files to include in generation (optional)",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "path": {"type": "string", "description": "Path to file"},
                                    "content": {"type": "string", "description": "Base64 encoded file content"},
                                    "type": {"type": "string", "description": "MIME type (auto-detected from file extension)"}
                                }
                            }
                        },
                        "model": {
                            "type": "string",
                            "description": f"Gemini model to use (default: {GEMINI_MODEL})"
                        },
                        "temperature": {
                            "type": "number",
                            "description": f"Temperature for generation 0-2 (default: {GEMINI_TEMPERATURE})"
                        }
                    },
                    "required": ["user_prompt"]
                }
            )
        ]
        return tools
    except Exception as e:
        logger.error(f"Error in list_tools(): {e}")
        raise

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
    """Handle tool calls"""
    logger.info(f"🔵 MCP tool call received: {name}")
    if name == "generate_content":
        try:
            user_prompt = arguments.get("user_prompt")
            if not user_prompt:
                logger.error("❌ user_prompt is required but missing")
                return [TextContent(type="text", text="Error: user_prompt is required")]
            
            system_prompt = arguments.get("system_prompt")
            files = arguments.get("files", [])
            model = arguments.get("model", GEMINI_MODEL)
            temperature = float(arguments.get("temperature", GEMINI_TEMPERATURE))
            
            # Prepare content for Gemini API
            # The API accepts contents as a string or list
            # For files, we need to handle them differently
            contents = user_prompt
            
            # If system prompt is provided, prepend it to the user prompt
            if system_prompt:
                contents = f"{system_prompt}\n\n{user_prompt}"
            
            # Prepare content for Gemini API
            # The google-genai API expects contents as a list of parts
            gemini_contents = []
            
            # Add text content as first part
            gemini_contents.append(contents)
            
            # Add file content if provided
            if files:
                try:
                    file_parts = prepare_gemini_files(files)
                    # Convert file parts to the format expected by Gemini API
                    for file_part in file_parts:
                        # The API expects parts with inline_data for binary content
                        gemini_contents.append({
                            "inline_data": {
                                "mime_type": file_part["mime_type"],
                                "data": base64.b64encode(file_part["data"]).decode('utf-8')
                            }
                        })
                    logger.info(f"Added {len(file_parts)} file(s) to Gemini request")
                except Exception as e:
                    logger.warning(f"Error preparing files: {e}, continuing with text only")
            
            # Generate content using Gemini API
            try:
                # Prepare generation config
                generation_config = {
                    "temperature": temperature,
                    "max_output_tokens": GEMINI_MAX_OUTPUT_TOKENS
                }
                
                # Convert timeout from milliseconds to seconds, cap at 20s to stay under 120s function limit
                timeout_seconds = min(GEMINI_TIMEOUT / 1000.0, 20.0)
                logger.info(f"🔵 Calling Gemini API with model={model}, timeout={timeout_seconds}s...")
                
                # Use asyncio.to_thread to make the blocking call async
                # The API accepts contents as a list and config as a separate parameter
                def generate_sync():
                    return gemini_client.models.generate_content(
                        model=model,
                        contents=gemini_contents,
                        config=generation_config,
                    )
                
                response = await asyncio.wait_for(
                    asyncio.to_thread(generate_sync),
                    timeout=timeout_seconds
                )
                logger.info(f"✅ Gemini API call completed successfully")
                
                # Extract text from response
                if response and hasattr(response, 'text') and response.text:
                    return [TextContent(type="text", text=response.text)]
                elif response and hasattr(response, 'candidates') and response.candidates:
                    # Try to extract text from candidates if response is a list of candidates
                    text_parts = []
                    for candidate in response.candidates:
                        if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'):
                            for part in candidate.content.parts:
                                if hasattr(part, 'text'):
                                    text_parts.append(part.text)
                    if text_parts:
                        text = ''.join(text_parts)
                        return [TextContent(type="text", text=text)]
                    else:
                        logger.warning("Gemini returned response but no text found")
                        return [TextContent(type="text", text="Error: No text in Gemini response")]
                else:
                    logger.warning("Gemini returned empty response")
                    return [TextContent(type="text", text="Error: No response from Gemini")]
                    
            except asyncio.TimeoutError:
                timeout_seconds = min(GEMINI_TIMEOUT / 1000.0, 100.0)
                error_msg = f"Gemini API call timed out after {timeout_seconds}s"
                logger.error(f"❌ {error_msg}")
                return [TextContent(type="text", text=f"Error: {error_msg}")]
            except Exception as e:
                logger.error(f"❌ Error generating content: {type(e).__name__}: {e}")
                import traceback
                logger.debug(f"Full traceback: {traceback.format_exc()}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]
                
        except Exception as e:
            logger.error(f"Error in generate_content: {e}")
            return [TextContent(type="text", text=f"Error: {str(e)}")]
    else:
        return [TextContent(type="text", text=f"Unknown tool: {name}")]

async def main():
    """Main entry point"""
    logger.info("=" * 60)
    logger.info("Starting Gemini MCP Server...")
    logger.info(f"Gemini API Key: {'Set' if GEMINI_API_KEY else 'Not Set'}")
    logger.info(f"Default Model: {GEMINI_MODEL}")
    logger.info(f"Default Lite Model: {GEMINI_MODEL_LITE}")
    logger.info("=" * 60)
    
    # Use stdio_server from mcp.server.stdio
    from mcp.server.stdio import stdio_server
    
    # Keep logging enabled for debugging
    original_root_level = logging.getLogger("root").level
    logging.getLogger("root").setLevel(logging.INFO)
    
    try:
        async with stdio_server() as streams:
            # Prepare server capabilities for initialization
            try:
                if hasattr(app, "get_capabilities"):
                    notification_options = NotificationOptions()
                    experimental_capabilities: dict[str, dict[str, Any]] = {}
                    server_capabilities = app.get_capabilities(
                        notification_options=notification_options,
                        experimental_capabilities=experimental_capabilities,
                    )
                else:
                    server_capabilities = mcp_types.ServerCapabilities()
            except Exception as cap_error:
                logger.warning(f"Failed to gather server capabilities: {cap_error}")
                server_capabilities = mcp_types.ServerCapabilities()

            init_options = InitializationOptions(
                server_name="gemini-mcp-server",
                server_version="1.0.0",
                capabilities=server_capabilities,
            )

            logger.info("MCP server ready")
            try:
                # Run the server - it will automatically handle the initialization handshake
                await app.run(
                    read_stream=streams[0],
                    write_stream=streams[1],
                    initialization_options=init_options,
                )
            except Exception as run_error:
                logger.error(f"Error in app.run(): {run_error}")
                raise
    except Exception as e:
        logging.getLogger("root").setLevel(original_root_level)
        logger.error(f"MCP server fatal error: {type(e).__name__}: {e}")
        raise

if __name__ == "__main__":
    asyncio.run(main())