Spaces:
Build error
Build error
| # azure_unified_assistant.py | |
| """ | |
| Azure OpenAI Unified Assistant with GPT-4o Realtime | |
| Supports: Voice I/O, Streaming Text, Document Q&A, Vision | |
| """ | |
| import os | |
| import json | |
| import asyncio | |
| import base64 | |
| import queue | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List, Tuple, AsyncGenerator | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import gradio as gr | |
| import numpy as np | |
| import websockets | |
| from openai import AzureOpenAI, AsyncAzureOpenAI | |
| from dotenv import load_dotenv | |
| import logging | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ========================= CONFIGURATION ========================= | |
| class UnifiedAssistantConfig: | |
| """Configuration for GPT-4o Realtime Unified Assistant""" | |
| # Azure Configuration | |
| azure_endpoint: str | |
| api_key: str | |
| api_version: str = "2024-10-01-preview" | |
| # Model Configuration | |
| model_name: str = "gpt-4o-realtime-preview-2024-10-01" | |
| deployment_name: str = "gpt-4o-realtime-preview" | |
| # Vector Storage | |
| vector_store_id: Optional[str] = None | |
| # Audio Configuration | |
| voice: str = "alloy" # alloy, echo, fable, onyx, nova, shimmer | |
| audio_format: str = "pcm16" | |
| sample_rate: int = 24000 | |
| # Model Parameters | |
| temperature: float = 0.7 | |
| max_tokens: int = 4096 | |
| top_p: float = 0.95 | |
| # System Configuration | |
| enable_streaming: bool = True | |
| enable_voice: bool = True | |
| enable_vision: bool = True | |
| enable_functions: bool = True | |
| # WebSocket Configuration | |
| ws_timeout: int = 60 | |
| ws_max_size: int = 10 * 1024 * 1024 # 10MB | |
| class ResponseType(Enum): | |
| """Response types from the unified model""" | |
| TEXT = "text" | |
| AUDIO = "audio" | |
| TRANSCRIPT = "transcript" | |
| FUNCTION = "function" | |
| ERROR = "error" | |
| DONE = "done" | |
| # ========================= UNIFIED ASSISTANT ========================= | |
| class GPT4oRealtimeAssistant: | |
| """ | |
| Unified Assistant using GPT-4o Realtime Preview | |
| Single model for all capabilities | |
| """ | |
| def __init__(self, config: UnifiedAssistantConfig): | |
| self.config = config | |
| self.websocket = None | |
| self.session_active = False | |
| self.conversation_id = None | |
| # Initialize clients | |
| self._initialize_clients() | |
| # Audio handling | |
| self.audio_queue = queue.Queue() | |
| self.is_recording = False | |
| logger.info(f"β Initialized GPT-4o Realtime Assistant with model: {config.model_name}") | |
| def _initialize_clients(self): | |
| """Initialize both REST and WebSocket clients""" | |
| try: | |
| # REST client for standard operations | |
| self.rest_client = AzureOpenAI( | |
| azure_endpoint=self.config.azure_endpoint, | |
| api_key=self.config.api_key, | |
| api_version=self.config.api_version | |
| ) | |
| # Async REST client for streaming | |
| self.async_client = AsyncAzureOpenAI( | |
| azure_endpoint=self.config.azure_endpoint, | |
| api_key=self.config.api_key, | |
| api_version=self.config.api_version | |
| ) | |
| logger.info("β REST clients initialized successfully") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize clients: {str(e)}") | |
| raise | |
| # ==================== REALTIME WEBSOCKET CONNECTION ==================== | |
| async def connect_realtime(self): | |
| """Establish WebSocket connection for realtime interaction""" | |
| if self.session_active: | |
| logger.info("Session already active") | |
| return | |
| try: | |
| # Construct WebSocket URL | |
| ws_endpoint = self.config.azure_endpoint.replace("https://", "wss://") | |
| ws_url = f"{ws_endpoint}/openai/realtime?api-version={self.config.api_version}&deployment={self.config.deployment_name}" | |
| headers = { | |
| "api-key": self.config.api_key, | |
| "OpenAI-Beta": "realtime=v1" | |
| } | |
| # Connect with timeout | |
| self.websocket = await asyncio.wait_for( | |
| websockets.connect( | |
| ws_url, | |
| extra_headers=headers, | |
| max_size=self.config.ws_max_size | |
| ), | |
| timeout=self.config.ws_timeout | |
| ) | |
| # Configure session | |
| await self._configure_session() | |
| self.session_active = True | |
| self.conversation_id = f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| logger.info(f"β Realtime session connected: {self.conversation_id}") | |
| except asyncio.TimeoutError: | |
| logger.error("β WebSocket connection timeout") | |
| raise | |
| except Exception as e: | |
| logger.error(f"β Failed to connect: {str(e)}") | |
| raise | |
| async def _configure_session(self): | |
| """Configure the realtime session with all capabilities""" | |
| session_config = { | |
| "type": "session.update", | |
| "session": { | |
| "model": self.config.model_name, | |
| "modalities": ["text", "audio"], | |
| "voice": self.config.voice, | |
| "instructions": """You are a helpful AI assistant with access to: | |
| 1. Realtime voice conversations | |
| 2. Document search and analysis | |
| 3. Streaming text responses | |
| 4. Image understanding | |
| 5. Function calling capabilities | |
| Provide clear, accurate, and helpful responses. | |
| For document questions, cite sources when available. | |
| Keep voice responses concise and natural.""", | |
| # Audio Configuration | |
| "input_audio_format": self.config.audio_format, | |
| "output_audio_format": self.config.audio_format, | |
| "input_audio_transcription": { | |
| "model": "whisper-1" | |
| }, | |
| # Voice Activity Detection | |
| "turn_detection": { | |
| "type": "server_vad", | |
| "threshold": 0.5, | |
| "prefix_padding_ms": 300, | |
| "silence_duration_ms": 500 | |
| }, | |
| # Tools Configuration | |
| "tools": self._get_tools_config(), | |
| "tool_choice": "auto", | |
| # Model Parameters | |
| "temperature": self.config.temperature, | |
| "max_response_output_tokens": self.config.max_tokens | |
| } | |
| } | |
| await self.websocket.send(json.dumps(session_config)) | |
| # Wait for session confirmation | |
| response = await self.websocket.recv() | |
| data = json.loads(response) | |
| if data.get("type") == "session.created": | |
| logger.info("β Session configured successfully") | |
| else: | |
| logger.warning(f"β οΈ Unexpected session response: {data.get('type')}") | |
| def _get_tools_config(self) -> List[Dict]: | |
| """Get tools configuration based on settings""" | |
| tools = [] | |
| if self.config.vector_store_id: | |
| tools.append({ | |
| "type": "file_search", | |
| "file_search": { | |
| "vector_store_ids": [self.config.vector_store_id] | |
| } | |
| }) | |
| if self.config.enable_functions: | |
| tools.append({ | |
| "type": "code_interpreter" | |
| }) | |
| return tools | |
| # ==================== AUDIO HANDLING ==================== | |
| async def send_audio(self, audio_data: bytes): | |
| """Send audio input to the model""" | |
| if not self.session_active: | |
| await self.connect_realtime() | |
| try: | |
| # Convert to base64 | |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| message = { | |
| "type": "input_audio_buffer.append", | |
| "audio": audio_base64 | |
| } | |
| await self.websocket.send(json.dumps(message)) | |
| # Commit audio buffer to trigger processing | |
| await self.websocket.send(json.dumps({ | |
| "type": "input_audio_buffer.commit" | |
| })) | |
| except Exception as e: | |
| logger.error(f"β Error sending audio: {str(e)}") | |
| raise | |
| async def process_audio_stream(self, audio_stream: AsyncGenerator[bytes, None]): | |
| """Process continuous audio stream""" | |
| if not self.session_active: | |
| await self.connect_realtime() | |
| try: | |
| async for audio_chunk in audio_stream: | |
| await self.send_audio(audio_chunk) | |
| except Exception as e: | |
| logger.error(f"β Error processing audio stream: {str(e)}") | |
| raise | |
| # ==================== TEXT HANDLING ==================== | |
| async def send_text(self, text: str, use_realtime: bool = False): | |
| """Send text input - can use either realtime or REST API""" | |
| if use_realtime: | |
| # Use realtime WebSocket | |
| if not self.session_active: | |
| await self.connect_realtime() | |
| message = { | |
| "type": "conversation.item.create", | |
| "item": { | |
| "type": "message", | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": text | |
| } | |
| ] | |
| } | |
| } | |
| await self.websocket.send(json.dumps(message)) | |
| # Trigger response | |
| await self.websocket.send(json.dumps({ | |
| "type": "response.create", | |
| "response": { | |
| "modalities": ["text", "audio"] if self.config.enable_voice else ["text"] | |
| } | |
| })) | |
| else: | |
| # Use REST API for text-only | |
| return self.rest_client.chat.completions.create( | |
| model=self.config.deployment_name, | |
| messages=[ | |
| {"role": "user", "content": text} | |
| ], | |
| stream=self.config.enable_streaming, | |
| temperature=self.config.temperature, | |
| max_tokens=self.config.max_tokens | |
| ) | |
| # ==================== STREAMING RESPONSES ==================== | |
| async def stream_response(self) -> AsyncGenerator[Dict[str, Any], None]: | |
| """Stream responses from the realtime connection""" | |
| if not self.session_active: | |
| logger.error("β No active session") | |
| return | |
| try: | |
| while True: | |
| message = await asyncio.wait_for( | |
| self.websocket.recv(), | |
| timeout=self.config.ws_timeout | |
| ) | |
| data = json.loads(message) | |
| response_type = data.get("type", "") | |
| # Handle different response types | |
| if response_type == "response.audio.delta": | |
| yield { | |
| "type": ResponseType.AUDIO, | |
| "data": base64.b64decode(data.get("delta", "")) | |
| } | |
| elif response_type == "response.text.delta": | |
| yield { | |
| "type": ResponseType.TEXT, | |
| "data": data.get("delta", "") | |
| } | |
| elif response_type == "response.audio_transcript.delta": | |
| yield { | |
| "type": ResponseType.TRANSCRIPT, | |
| "data": data.get("delta", "") | |
| } | |
| elif response_type == "response.function_call_arguments.delta": | |
| yield { | |
| "type": ResponseType.FUNCTION, | |
| "data": json.loads(data.get("delta", "{}")) | |
| } | |
| elif response_type == "response.done": | |
| yield { | |
| "type": ResponseType.DONE, | |
| "data": None | |
| } | |
| break | |
| elif response_type == "error": | |
| yield { | |
| "type": ResponseType.ERROR, | |
| "data": data.get("error", {}) | |
| } | |
| break | |
| except asyncio.TimeoutError: | |
| logger.error("β Response timeout") | |
| yield { | |
| "type": ResponseType.ERROR, | |
| "data": {"message": "Response timeout"} | |
| } | |
| except Exception as e: | |
| logger.error(f"β Stream error: {str(e)}") | |
| yield { | |
| "type": ResponseType.ERROR, | |
| "data": {"message": str(e)} | |
| } | |
| # ==================== DOCUMENT Q&A ==================== | |
| async def query_documents(self, query: str, stream: bool = True): | |
| """Query documents using vector search""" | |
| if not self.config.vector_store_id: | |
| return "No vector store configured. Please set up a vector store ID." | |
| try: | |
| # Create assistant with vector search | |
| assistant = self.rest_client.beta.assistants.create( | |
| model=self.config.deployment_name, | |
| name="Document Q&A Assistant", | |
| instructions="""Answer questions based on the provided documents. | |
| Cite sources when possible. | |
| If information is not in the documents, say so clearly.""", | |
| tools=[{"type": "file_search"}], | |
| tool_resources={ | |
| "file_search": { | |
| "vector_store_ids": [self.config.vector_store_id] | |
| } | |
| }, | |
| temperature=self.config.temperature | |
| ) | |
| # Create thread | |
| thread = self.rest_client.beta.threads.create() | |
| # Add message | |
| self.rest_client.beta.threads.messages.create( | |
| thread_id=thread.id, | |
| role="user", | |
| content=query | |
| ) | |
| # Run with streaming | |
| if stream: | |
| with self.rest_client.beta.threads.runs.create_and_stream( | |
| thread_id=thread.id, | |
| assistant_id=assistant.id, | |
| ) as stream: | |
| for event in stream: | |
| if event.event == 'thread.message.delta': | |
| if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'content'): | |
| for content in event.data.delta.content: | |
| if content.type == 'text' and hasattr(content.text, 'value'): | |
| yield content.text.value | |
| else: | |
| # Non-streaming response | |
| run = self.rest_client.beta.threads.runs.create( | |
| thread_id=thread.id, | |
| assistant_id=assistant.id | |
| ) | |
| # Wait for completion | |
| while run.status in ['queued', 'in_progress']: | |
| time.sleep(1) | |
| run = self.rest_client.beta.threads.runs.retrieve( | |
| thread_id=thread.id, | |
| run_id=run.id | |
| ) | |
| if run.status == 'completed': | |
| messages = self.rest_client.beta.threads.messages.list( | |
| thread_id=thread.id | |
| ) | |
| if messages.data: | |
| return messages.data[0].content[0].text.value | |
| else: | |
| return f"Query failed: {run.status}" | |
| except Exception as e: | |
| logger.error(f"β Document query error: {str(e)}") | |
| return f"Error querying documents: {str(e)}" | |
| # ==================== MULTIMODAL HANDLING ==================== | |
| async def analyze_image(self, image_path: str, question: str): | |
| """Analyze image with text/voice response""" | |
| try: | |
| # Read and encode image | |
| with open(image_path, "rb") as image_file: | |
| image_data = base64.b64encode(image_file.read()).decode('utf-8') | |
| if self.session_active: | |
| # Use realtime connection for voice response | |
| message = { | |
| "type": "conversation.item.create", | |
| "item": { | |
| "type": "message", | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": question | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{image_data}", | |
| "detail": "high" | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| await self.websocket.send(json.dumps(message)) | |
| # Trigger response | |
| await self.websocket.send(json.dumps({ | |
| "type": "response.create" | |
| })) | |
| # Stream response | |
| async for response in self.stream_response(): | |
| yield response | |
| else: | |
| # Use REST API | |
| response = self.rest_client.chat.completions.create( | |
| model=self.config.deployment_name, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": question}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{image_data}", | |
| "detail": "high" | |
| } | |
| } | |
| ] | |
| } | |
| ], | |
| max_tokens=self.config.max_tokens, | |
| temperature=self.config.temperature | |
| ) | |
| yield { | |
| "type": ResponseType.TEXT, | |
| "data": response.choices[0].message.content | |
| } | |
| except Exception as e: | |
| logger.error(f"β Image analysis error: {str(e)}") | |
| yield { | |
| "type": ResponseType.ERROR, | |
| "data": {"message": str(e)} | |
| } | |
| # ==================== CLEANUP ==================== | |
| async def disconnect(self): | |
| """Disconnect realtime session""" | |
| if self.websocket: | |
| await self.websocket.close() | |
| self.session_active = False | |
| logger.info("β Disconnected from realtime session") | |
| # ========================= GRADIO INTERFACE ========================= | |
| class UnifiedGradioInterface: | |
| """Enhanced Gradio interface for the unified assistant""" | |
| def __init__(self, assistant: GPT4oRealtimeAssistant): | |
| self.assistant = assistant | |
| self.conversation_history = [] | |
| self.audio_processor = AudioProcessor() | |
| def create_interface(self) -> gr.Blocks: | |
| """Create the complete Gradio interface""" | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(), | |
| title="Azure GPT-4o Realtime Assistant", | |
| css=self._get_custom_css() | |
| ) as interface: | |
| # Header | |
| gr.Markdown( | |
| """ | |
| # π Azure GPT-4o Realtime Assistant | |
| **Unified Model with All Capabilities:** | |
| - ποΈ **Voice:** Realtime voice conversations with ~300ms latency | |
| - β‘ **Streaming:** Live text streaming responses | |
| - π **Documents:** Vector search and Q&A on your documents | |
| - πΌοΈ **Vision:** Image analysis and understanding | |
| - π§ **Functions:** Tool calling and code interpretation | |
| **Model:** `gpt-4o-realtime-preview-2024-10-01` | **Context:** 128K tokens | |
| """ | |
| ) | |
| # State management | |
| state = gr.State({ | |
| "session_active": False, | |
| "conversation_id": None | |
| }) | |
| with gr.Tabs() as tabs: | |
| # ============ TAB 1: STREAMING CHAT ============ | |
| with gr.Tab("π¬ Streaming Chat", elem_id="chat-tab"): | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| show_label=False, | |
| elem_id="chatbot", | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Type your message here... (Press Enter to send)", | |
| label="Message", | |
| scale=4, | |
| lines=2 | |
| ) | |
| with gr.Column(scale=1): | |
| send_btn = gr.Button("Send", variant="primary", size="lg") | |
| clear_btn = gr.Button("Clear", variant="secondary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Settings") | |
| streaming_mode = gr.Radio( | |
| choices=["Stream", "Complete"], | |
| value="Stream", | |
| label="Response Mode" | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0, | |
| maximum=1, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature" | |
| ) | |
| gr.Markdown("### Status") | |
| status_display = gr.Markdown("π’ Ready") | |
| # Chat handlers | |
| async def handle_chat(message, history, mode): | |
| if not message: | |
| return history | |
| history = history or [] | |
| history.append([message, ""]) | |
| try: | |
| if mode == "Stream": | |
| response = "" | |
| # Stream text response | |
| await self.assistant.send_text(message, use_realtime=True) | |
| async for chunk in self.assistant.stream_response(): | |
| if chunk["type"] == ResponseType.TEXT: | |
| response += chunk["data"] | |
| history[-1][1] = response | |
| yield history | |
| elif chunk["type"] == ResponseType.DONE: | |
| break | |
| else: | |
| # Complete response | |
| result = await self.assistant.send_text(message, use_realtime=False) | |
| history[-1][1] = result.choices[0].message.content | |
| yield history | |
| except Exception as e: | |
| history[-1][1] = f"β Error: {str(e)}" | |
| yield history | |
| msg.submit( | |
| handle_chat, | |
| [msg, chatbot, streaming_mode], | |
| chatbot | |
| ).then( | |
| lambda: "", | |
| outputs=msg | |
| ) | |
| send_btn.click( | |
| handle_chat, | |
| [msg, chatbot, streaming_mode], | |
| chatbot | |
| ).then( | |
| lambda: "", | |
| outputs=msg | |
| ) | |
| clear_btn.click( | |
| lambda: ([], ""), | |
| outputs=[chatbot, msg] | |
| ) | |
| # ============ TAB 2: VOICE INTERACTION ============ | |
| with gr.Tab("ποΈ Voice Chat", elem_id="voice-tab"): | |
| gr.Markdown( | |
| """ | |
| ### Realtime Voice Interaction | |
| Talk naturally with the assistant using your microphone. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| # Connection control | |
| with gr.Row(): | |
| connect_btn = gr.Button("π Connect Voice", variant="primary") | |
| disconnect_btn = gr.Button("π Disconnect", variant="stop") | |
| connection_status = gr.Markdown("βͺ Not connected") | |
| # Audio interface | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="numpy", | |
| streaming=True, | |
| label="π€ Speak to Assistant" | |
| ) | |
| audio_output = gr.Audio( | |
| label="π Assistant Response", | |
| type="numpy", | |
| autoplay=True | |
| ) | |
| with gr.Column(): | |
| # Conversation display | |
| transcript = gr.Textbox( | |
| label="Conversation Transcript", | |
| lines=20, | |
| max_lines=30, | |
| interactive=False | |
| ) | |
| # Voice settings | |
| voice_select = gr.Dropdown( | |
| choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], | |
| value="alloy", | |
| label="Voice Selection" | |
| ) | |
| # Voice handlers | |
| async def connect_voice(): | |
| try: | |
| await self.assistant.connect_realtime() | |
| return "π’ Connected to realtime voice" | |
| except Exception as e: | |
| return f"π΄ Connection failed: {str(e)}" | |
| async def disconnect_voice(): | |
| try: | |
| await self.assistant.disconnect() | |
| return "βͺ Disconnected" | |
| except Exception as e: | |
| return f"π΄ Error: {str(e)}" | |
| async def process_voice(audio_data): | |
| if audio_data is None: | |
| return None, "" | |
| try: | |
| # Convert audio format | |
| sample_rate, audio = audio_data | |
| audio_bytes = (audio * 32767).astype(np.int16).tobytes() | |
| # Send audio | |
| await self.assistant.send_audio(audio_bytes) | |
| # Collect responses | |
| transcript_text = "" | |
| audio_chunks = [] | |
| async for response in self.assistant.stream_response(): | |
| if response["type"] == ResponseType.AUDIO: | |
| audio_chunks.append(response["data"]) | |
| elif response["type"] == ResponseType.TRANSCRIPT: | |
| transcript_text += response["data"] | |
| elif response["type"] == ResponseType.DONE: | |
| break | |
| # Process audio output | |
| if audio_chunks: | |
| combined = b''.join(audio_chunks) | |
| audio_array = np.frombuffer(combined, dtype=np.int16) / 32767.0 | |
| return (24000, audio_array), transcript_text | |
| return None, transcript_text | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| connect_btn.click( | |
| connect_voice, | |
| outputs=connection_status | |
| ) | |
| disconnect_btn.click( | |
| disconnect_voice, | |
| outputs=connection_status | |
| ) | |
| audio_input.stream( | |
| process_voice, | |
| audio_input, | |
| [audio_output, transcript] | |
| ) | |
| # ============ TAB 3: DOCUMENT Q&A ============ | |
| with gr.Tab("π Document Q&A", elem_id="doc-tab"): | |
| gr.Markdown( | |
| """ | |
| ### Query Your Documents | |
| Ask questions about documents in your vector storage. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| doc_query = gr.Textbox( | |
| placeholder="Ask a question about your documents...", | |
| label="Query", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| search_btn = gr.Button("π Search", variant="primary") | |
| stream_toggle = gr.Checkbox(label="Stream Response", value=True) | |
| doc_response = gr.Textbox( | |
| label="Response", | |
| lines=15, | |
| max_lines=25 | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Vector Store Info") | |
| vector_status = gr.Markdown( | |
| f"**Store ID:** `{self.assistant.config.vector_store_id or 'Not configured'}`" | |
| ) | |
| gr.Markdown( | |
| """ | |
| ### Tips | |
| - Be specific in your questions | |
| - Reference document sections if known | |
| - Ask for citations when needed | |
| """ | |
| ) | |
| # Document Q&A handler | |
| async def search_documents(query, stream): | |
| if not query: | |
| return "Please enter a query." | |
| try: | |
| if stream: | |
| response = "" | |
| async for chunk in self.assistant.query_documents(query, stream=True): | |
| response += chunk | |
| yield response | |
| else: | |
| response = await self.assistant.query_documents(query, stream=False) | |
| yield response | |
| except Exception as e: | |
| yield f"β Error: {str(e)}" | |
| search_btn.click( | |
| search_documents, | |
| [doc_query, stream_toggle], | |
| doc_response | |
| ) | |
| doc_query.submit( | |
| search_documents, | |
| [doc_query, stream_toggle], | |
| doc_response | |
| ) | |
| # ============ TAB 4: IMAGE ANALYSIS ============ | |
| with gr.Tab("πΌοΈ Vision", elem_id="vision-tab"): | |
| gr.Markdown( | |
| """ | |
| ### Image Analysis with Voice | |
| Upload an image and ask questions about it. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_input = gr.Image( | |
| type="filepath", | |
| label="Upload Image" | |
| ) | |
| image_query = gr.Textbox( | |
| placeholder="What would you like to know about this image?", | |
| label="Question", | |
| lines=2 | |
| ) | |
| analyze_btn = gr.Button("π Analyze", variant="primary") | |
| with gr.Column(): | |
| image_response = gr.Textbox( | |
| label="Analysis", | |
| lines=10 | |
| ) | |
| image_audio = gr.Audio( | |
| label="Voice Response", | |
| type="numpy" | |
| ) | |
| # Image analysis handler | |
| async def analyze_image(image, query): | |
| if not image or not query: | |
| return "Please provide both an image and a question.", None | |
| try: | |
| text_response = "" | |
| audio_chunks = [] | |
| async for response in self.assistant.analyze_image(image, query): | |
| if response["type"] == ResponseType.TEXT: | |
| text_response += response["data"] | |
| elif response["type"] == ResponseType.AUDIO: | |
| audio_chunks.append(response["data"]) | |
| # Process audio if available | |
| audio_output = None | |
| if audio_chunks: | |
| combined = b''.join(audio_chunks) | |
| audio_array = np.frombuffer(combined, dtype=np.int16) / 32767.0 | |
| audio_output = (24000, audio_array) | |
| return text_response, audio_output | |
| except Exception as e: | |
| return f"β Error: {str(e)}", None | |
| analyze_btn.click( | |
| analyze_image, | |
| [image_input, image_query], | |
| [image_response, image_audio] | |
| ) | |
| # ============ TAB 5: SETTINGS ============ | |
| with gr.Tab("βοΈ Settings", elem_id="settings-tab"): | |
| gr.Markdown("### Model Configuration") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### Model Information") | |
| model_info = gr.JSON( | |
| value={ | |
| "model": self.assistant.config.model_name, | |
| "deployment": self.assistant.config.deployment_name, | |
| "context_window": 128000, | |
| "capabilities": [ | |
| "realtime_audio", | |
| "streaming_text", | |
| "document_qa", | |
| "vision", | |
| "function_calling" | |
| ], | |
| "audio_format": self.assistant.config.audio_format, | |
| "sample_rate": self.assistant.config.sample_rate | |
| }, | |
| label="Current Configuration" | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("#### Performance Metrics") | |
| metrics = gr.JSON( | |
| value={ | |
| "voice_latency": "~300ms", | |
| "text_streaming": "~50ms/token", | |
| "document_search": "1-2 seconds", | |
| "image_analysis": "2-3 seconds" | |
| }, | |
| label="Expected Performance" | |
| ) | |
| # Footer | |
| gr.Markdown( | |
| """ | |
| --- | |
| <center> | |
| **Azure GPT-4o Realtime** | Single Model for All Tasks | | |
| [Documentation](https://docs.microsoft.com/azure/cognitive-services/openai/) | |
| </center> | |
| """ | |
| ) | |
| return interface | |
| def _get_custom_css(self) -> str: | |
| """Get custom CSS for the interface""" | |
| return """ | |
| #chatbot { | |
| height: 500px !important; | |
| } | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| margin: auto !important; | |
| } | |
| #voice-tab .gradio-audio { | |
| height: 120px !important; | |
| } | |
| .markdown-text { | |
| font-size: 14px !important; | |
| } | |
| """ | |
| # ========================= AUDIO PROCESSOR ========================= | |
| class AudioProcessor: | |
| """Handle audio format conversions and processing""" | |
| def numpy_to_pcm16(audio_array: np.ndarray) -> bytes: | |
| """Convert numpy array to PCM16 bytes""" | |
| return (audio_array * 32767).astype(np.int16).tobytes() | |
| def pcm16_to_numpy(audio_bytes: bytes) -> np.ndarray: | |
| """Convert PCM16 bytes to numpy array""" | |
| return np.frombuffer(audio_bytes, dtype=np.int16) / 32767.0 | |
| # ========================= MAIN APPLICATION ========================= | |
| async def main(): | |
| """Main application entry point""" | |
| # Load configuration from environment | |
| config = UnifiedAssistantConfig( | |
| azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
| api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
| api_version=os.getenv("AZURE_API_VERSION", "2024-10-01-preview"), | |
| deployment_name=os.getenv("AZURE_DEPLOYMENT_NAME", "gpt-4o-realtime-preview"), | |
| vector_store_id=os.getenv("AZURE_VECTOR_STORE_ID"), | |
| voice=os.getenv("AZURE_VOICE", "alloy"), | |
| temperature=float(os.getenv("TEMPERATURE", "0.7")), | |
| max_tokens=int(os.getenv("MAX_TOKENS", "4096")) | |
| ) | |
| # Validate configuration | |
| if not config.azure_endpoint or not config.api_key: | |
| logger.error("β Missing required environment variables") | |
| raise ValueError( | |
| "Please set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY environment variables" | |
| ) | |
| if not config.vector_store_id: | |
| logger.warning("β οΈ No vector store ID configured - document Q&A will be limited") | |
| try: | |
| # Initialize assistant | |
| logger.info("π Initializing GPT-4o Realtime Unified Assistant...") | |
| assistant = GPT4oRealtimeAssistant(config) | |
| # Create Gradio interface | |
| logger.info("π¨ Creating Gradio interface...") | |
| interface_manager = UnifiedGradioInterface(assistant) | |
| interface = interface_manager.create_interface() | |
| # Launch application | |
| logger.info("β Launching application...") | |
| interface.queue(max_size=100) # Enable queuing for concurrent users | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, # Create public URL | |
| show_error=True, | |
| quiet=False | |
| ) | |
| except KeyboardInterrupt: | |
| logger.info("π Shutting down gracefully...") | |
| if assistant.session_active: | |
| await assistant.disconnect() | |
| except Exception as e: | |
| logger.error(f"β Fatal error: {str(e)}") | |
| raise | |
| def run(): | |
| """Synchronous entry point""" | |
| asyncio.run(main()) | |
| if __name__ == "__main__": | |
| run() |