# azure_unified_assistant.py """ Azure OpenAI Unified Assistant with GPT-4o Realtime Supports: Voice I/O, Streaming Text, Document Q&A, Vision """ import os import json import asyncio import base64 import queue import threading import time from datetime import datetime from typing import Optional, Dict, Any, List, Tuple, AsyncGenerator from dataclasses import dataclass, field from enum import Enum import gradio as gr import numpy as np import websockets from openai import AzureOpenAI, AsyncAzureOpenAI from dotenv import load_dotenv import logging # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ========================= CONFIGURATION ========================= @dataclass class UnifiedAssistantConfig: """Configuration for GPT-4o Realtime Unified Assistant""" # Azure Configuration azure_endpoint: str api_key: str api_version: str = "2024-10-01-preview" # Model Configuration model_name: str = "gpt-4o-realtime-preview-2024-10-01" deployment_name: str = "gpt-4o-realtime-preview" # Vector Storage vector_store_id: Optional[str] = None # Audio Configuration voice: str = "alloy" # alloy, echo, fable, onyx, nova, shimmer audio_format: str = "pcm16" sample_rate: int = 24000 # Model Parameters temperature: float = 0.7 max_tokens: int = 4096 top_p: float = 0.95 # System Configuration enable_streaming: bool = True enable_voice: bool = True enable_vision: bool = True enable_functions: bool = True # WebSocket Configuration ws_timeout: int = 60 ws_max_size: int = 10 * 1024 * 1024 # 10MB class ResponseType(Enum): """Response types from the unified model""" TEXT = "text" AUDIO = "audio" TRANSCRIPT = "transcript" FUNCTION = "function" ERROR = "error" DONE = "done" # ========================= UNIFIED ASSISTANT ========================= class GPT4oRealtimeAssistant: """ Unified Assistant using GPT-4o Realtime Preview Single model for all capabilities """ def __init__(self, config: UnifiedAssistantConfig): self.config = config self.websocket = None self.session_active = False self.conversation_id = None # Initialize clients self._initialize_clients() # Audio handling self.audio_queue = queue.Queue() self.is_recording = False logger.info(f"✅ Initialized GPT-4o Realtime Assistant with model: {config.model_name}") def _initialize_clients(self): """Initialize both REST and WebSocket clients""" try: # REST client for standard operations self.rest_client = AzureOpenAI( azure_endpoint=self.config.azure_endpoint, api_key=self.config.api_key, api_version=self.config.api_version ) # Async REST client for streaming self.async_client = AsyncAzureOpenAI( azure_endpoint=self.config.azure_endpoint, api_key=self.config.api_key, api_version=self.config.api_version ) logger.info("✅ REST clients initialized successfully") except Exception as e: logger.error(f"❌ Failed to initialize clients: {str(e)}") raise # ==================== REALTIME WEBSOCKET CONNECTION ==================== async def connect_realtime(self): """Establish WebSocket connection for realtime interaction""" if self.session_active: logger.info("Session already active") return try: # Construct WebSocket URL ws_endpoint = self.config.azure_endpoint.replace("https://", "wss://") ws_url = f"{ws_endpoint}/openai/realtime?api-version={self.config.api_version}&deployment={self.config.deployment_name}" headers = { "api-key": self.config.api_key, "OpenAI-Beta": "realtime=v1" } # Connect with timeout self.websocket = await asyncio.wait_for( websockets.connect( ws_url, extra_headers=headers, max_size=self.config.ws_max_size ), timeout=self.config.ws_timeout ) # Configure session await self._configure_session() self.session_active = True self.conversation_id = f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}" logger.info(f"✅ Realtime session connected: {self.conversation_id}") except asyncio.TimeoutError: logger.error("❌ WebSocket connection timeout") raise except Exception as e: logger.error(f"❌ Failed to connect: {str(e)}") raise async def _configure_session(self): """Configure the realtime session with all capabilities""" session_config = { "type": "session.update", "session": { "model": self.config.model_name, "modalities": ["text", "audio"], "voice": self.config.voice, "instructions": """You are a helpful AI assistant with access to: 1. Realtime voice conversations 2. Document search and analysis 3. Streaming text responses 4. Image understanding 5. Function calling capabilities Provide clear, accurate, and helpful responses. For document questions, cite sources when available. Keep voice responses concise and natural.""", # Audio Configuration "input_audio_format": self.config.audio_format, "output_audio_format": self.config.audio_format, "input_audio_transcription": { "model": "whisper-1" }, # Voice Activity Detection "turn_detection": { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500 }, # Tools Configuration "tools": self._get_tools_config(), "tool_choice": "auto", # Model Parameters "temperature": self.config.temperature, "max_response_output_tokens": self.config.max_tokens } } await self.websocket.send(json.dumps(session_config)) # Wait for session confirmation response = await self.websocket.recv() data = json.loads(response) if data.get("type") == "session.created": logger.info("✅ Session configured successfully") else: logger.warning(f"⚠️ Unexpected session response: {data.get('type')}") def _get_tools_config(self) -> List[Dict]: """Get tools configuration based on settings""" tools = [] if self.config.vector_store_id: tools.append({ "type": "file_search", "file_search": { "vector_store_ids": [self.config.vector_store_id] } }) if self.config.enable_functions: tools.append({ "type": "code_interpreter" }) return tools # ==================== AUDIO HANDLING ==================== async def send_audio(self, audio_data: bytes): """Send audio input to the model""" if not self.session_active: await self.connect_realtime() try: # Convert to base64 audio_base64 = base64.b64encode(audio_data).decode('utf-8') message = { "type": "input_audio_buffer.append", "audio": audio_base64 } await self.websocket.send(json.dumps(message)) # Commit audio buffer to trigger processing await self.websocket.send(json.dumps({ "type": "input_audio_buffer.commit" })) except Exception as e: logger.error(f"❌ Error sending audio: {str(e)}") raise async def process_audio_stream(self, audio_stream: AsyncGenerator[bytes, None]): """Process continuous audio stream""" if not self.session_active: await self.connect_realtime() try: async for audio_chunk in audio_stream: await self.send_audio(audio_chunk) except Exception as e: logger.error(f"❌ Error processing audio stream: {str(e)}") raise # ==================== TEXT HANDLING ==================== async def send_text(self, text: str, use_realtime: bool = False): """Send text input - can use either realtime or REST API""" if use_realtime: # Use realtime WebSocket if not self.session_active: await self.connect_realtime() message = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [ { "type": "input_text", "text": text } ] } } await self.websocket.send(json.dumps(message)) # Trigger response await self.websocket.send(json.dumps({ "type": "response.create", "response": { "modalities": ["text", "audio"] if self.config.enable_voice else ["text"] } })) else: # Use REST API for text-only return self.rest_client.chat.completions.create( model=self.config.deployment_name, messages=[ {"role": "user", "content": text} ], stream=self.config.enable_streaming, temperature=self.config.temperature, max_tokens=self.config.max_tokens ) # ==================== STREAMING RESPONSES ==================== async def stream_response(self) -> AsyncGenerator[Dict[str, Any], None]: """Stream responses from the realtime connection""" if not self.session_active: logger.error("❌ No active session") return try: while True: message = await asyncio.wait_for( self.websocket.recv(), timeout=self.config.ws_timeout ) data = json.loads(message) response_type = data.get("type", "") # Handle different response types if response_type == "response.audio.delta": yield { "type": ResponseType.AUDIO, "data": base64.b64decode(data.get("delta", "")) } elif response_type == "response.text.delta": yield { "type": ResponseType.TEXT, "data": data.get("delta", "") } elif response_type == "response.audio_transcript.delta": yield { "type": ResponseType.TRANSCRIPT, "data": data.get("delta", "") } elif response_type == "response.function_call_arguments.delta": yield { "type": ResponseType.FUNCTION, "data": json.loads(data.get("delta", "{}")) } elif response_type == "response.done": yield { "type": ResponseType.DONE, "data": None } break elif response_type == "error": yield { "type": ResponseType.ERROR, "data": data.get("error", {}) } break except asyncio.TimeoutError: logger.error("❌ Response timeout") yield { "type": ResponseType.ERROR, "data": {"message": "Response timeout"} } except Exception as e: logger.error(f"❌ Stream error: {str(e)}") yield { "type": ResponseType.ERROR, "data": {"message": str(e)} } # ==================== DOCUMENT Q&A ==================== async def query_documents(self, query: str, stream: bool = True): """Query documents using vector search""" if not self.config.vector_store_id: return "No vector store configured. Please set up a vector store ID." try: # Create assistant with vector search assistant = self.rest_client.beta.assistants.create( model=self.config.deployment_name, name="Document Q&A Assistant", instructions="""Answer questions based on the provided documents. Cite sources when possible. If information is not in the documents, say so clearly.""", tools=[{"type": "file_search"}], tool_resources={ "file_search": { "vector_store_ids": [self.config.vector_store_id] } }, temperature=self.config.temperature ) # Create thread thread = self.rest_client.beta.threads.create() # Add message self.rest_client.beta.threads.messages.create( thread_id=thread.id, role="user", content=query ) # Run with streaming if stream: with self.rest_client.beta.threads.runs.create_and_stream( thread_id=thread.id, assistant_id=assistant.id, ) as stream: for event in stream: if event.event == 'thread.message.delta': if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'content'): for content in event.data.delta.content: if content.type == 'text' and hasattr(content.text, 'value'): yield content.text.value else: # Non-streaming response run = self.rest_client.beta.threads.runs.create( thread_id=thread.id, assistant_id=assistant.id ) # Wait for completion while run.status in ['queued', 'in_progress']: time.sleep(1) run = self.rest_client.beta.threads.runs.retrieve( thread_id=thread.id, run_id=run.id ) if run.status == 'completed': messages = self.rest_client.beta.threads.messages.list( thread_id=thread.id ) if messages.data: return messages.data[0].content[0].text.value else: return f"Query failed: {run.status}" except Exception as e: logger.error(f"❌ Document query error: {str(e)}") return f"Error querying documents: {str(e)}" # ==================== MULTIMODAL HANDLING ==================== async def analyze_image(self, image_path: str, question: str): """Analyze image with text/voice response""" try: # Read and encode image with open(image_path, "rb") as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') if self.session_active: # Use realtime connection for voice response message = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [ { "type": "input_text", "text": question }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": "high" } } ] } } await self.websocket.send(json.dumps(message)) # Trigger response await self.websocket.send(json.dumps({ "type": "response.create" })) # Stream response async for response in self.stream_response(): yield response else: # Use REST API response = self.rest_client.chat.completions.create( model=self.config.deployment_name, messages=[ { "role": "user", "content": [ {"type": "text", "text": question}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": "high" } } ] } ], max_tokens=self.config.max_tokens, temperature=self.config.temperature ) yield { "type": ResponseType.TEXT, "data": response.choices[0].message.content } except Exception as e: logger.error(f"❌ Image analysis error: {str(e)}") yield { "type": ResponseType.ERROR, "data": {"message": str(e)} } # ==================== CLEANUP ==================== async def disconnect(self): """Disconnect realtime session""" if self.websocket: await self.websocket.close() self.session_active = False logger.info("✅ Disconnected from realtime session") # ========================= GRADIO INTERFACE ========================= class UnifiedGradioInterface: """Enhanced Gradio interface for the unified assistant""" def __init__(self, assistant: GPT4oRealtimeAssistant): self.assistant = assistant self.conversation_history = [] self.audio_processor = AudioProcessor() def create_interface(self) -> gr.Blocks: """Create the complete Gradio interface""" with gr.Blocks( theme=gr.themes.Soft(), title="Azure GPT-4o Realtime Assistant", css=self._get_custom_css() ) as interface: # Header gr.Markdown( """ # 🌟 Azure GPT-4o Realtime Assistant **Unified Model with All Capabilities:** - 🎙️ **Voice:** Realtime voice conversations with ~300ms latency - ⚡ **Streaming:** Live text streaming responses - 📚 **Documents:** Vector search and Q&A on your documents - 🖼️ **Vision:** Image analysis and understanding - 🔧 **Functions:** Tool calling and code interpretation **Model:** `gpt-4o-realtime-preview-2024-10-01` | **Context:** 128K tokens """ ) # State management state = gr.State({ "session_active": False, "conversation_id": None }) with gr.Tabs() as tabs: # ============ TAB 1: STREAMING CHAT ============ with gr.Tab("💬 Streaming Chat", elem_id="chat-tab"): with gr.Row(): with gr.Column(scale=4): chatbot = gr.Chatbot( height=500, show_label=False, elem_id="chatbot", bubble_full_width=False ) with gr.Row(): msg = gr.Textbox( placeholder="Type your message here... (Press Enter to send)", label="Message", scale=4, lines=2 ) with gr.Column(scale=1): send_btn = gr.Button("Send", variant="primary", size="lg") clear_btn = gr.Button("Clear", variant="secondary") with gr.Column(scale=1): gr.Markdown("### Settings") streaming_mode = gr.Radio( choices=["Stream", "Complete"], value="Stream", label="Response Mode" ) temperature = gr.Slider( minimum=0, maximum=1, value=0.7, step=0.1, label="Temperature" ) gr.Markdown("### Status") status_display = gr.Markdown("🟢 Ready") # Chat handlers async def handle_chat(message, history, mode): if not message: return history history = history or [] history.append([message, ""]) try: if mode == "Stream": response = "" # Stream text response await self.assistant.send_text(message, use_realtime=True) async for chunk in self.assistant.stream_response(): if chunk["type"] == ResponseType.TEXT: response += chunk["data"] history[-1][1] = response yield history elif chunk["type"] == ResponseType.DONE: break else: # Complete response result = await self.assistant.send_text(message, use_realtime=False) history[-1][1] = result.choices[0].message.content yield history except Exception as e: history[-1][1] = f"❌ Error: {str(e)}" yield history msg.submit( handle_chat, [msg, chatbot, streaming_mode], chatbot ).then( lambda: "", outputs=msg ) send_btn.click( handle_chat, [msg, chatbot, streaming_mode], chatbot ).then( lambda: "", outputs=msg ) clear_btn.click( lambda: ([], ""), outputs=[chatbot, msg] ) # ============ TAB 2: VOICE INTERACTION ============ with gr.Tab("🎙️ Voice Chat", elem_id="voice-tab"): gr.Markdown( """ ### Realtime Voice Interaction Talk naturally with the assistant using your microphone. """ ) with gr.Row(): with gr.Column(): # Connection control with gr.Row(): connect_btn = gr.Button("🔌 Connect Voice", variant="primary") disconnect_btn = gr.Button("🔌 Disconnect", variant="stop") connection_status = gr.Markdown("⚪ Not connected") # Audio interface audio_input = gr.Audio( sources=["microphone"], type="numpy", streaming=True, label="🎤 Speak to Assistant" ) audio_output = gr.Audio( label="🔊 Assistant Response", type="numpy", autoplay=True ) with gr.Column(): # Conversation display transcript = gr.Textbox( label="Conversation Transcript", lines=20, max_lines=30, interactive=False ) # Voice settings voice_select = gr.Dropdown( choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], value="alloy", label="Voice Selection" ) # Voice handlers async def connect_voice(): try: await self.assistant.connect_realtime() return "🟢 Connected to realtime voice" except Exception as e: return f"🔴 Connection failed: {str(e)}" async def disconnect_voice(): try: await self.assistant.disconnect() return "⚪ Disconnected" except Exception as e: return f"🔴 Error: {str(e)}" async def process_voice(audio_data): if audio_data is None: return None, "" try: # Convert audio format sample_rate, audio = audio_data audio_bytes = (audio * 32767).astype(np.int16).tobytes() # Send audio await self.assistant.send_audio(audio_bytes) # Collect responses transcript_text = "" audio_chunks = [] async for response in self.assistant.stream_response(): if response["type"] == ResponseType.AUDIO: audio_chunks.append(response["data"]) elif response["type"] == ResponseType.TRANSCRIPT: transcript_text += response["data"] elif response["type"] == ResponseType.DONE: break # Process audio output if audio_chunks: combined = b''.join(audio_chunks) audio_array = np.frombuffer(combined, dtype=np.int16) / 32767.0 return (24000, audio_array), transcript_text return None, transcript_text except Exception as e: return None, f"Error: {str(e)}" connect_btn.click( connect_voice, outputs=connection_status ) disconnect_btn.click( disconnect_voice, outputs=connection_status ) audio_input.stream( process_voice, audio_input, [audio_output, transcript] ) # ============ TAB 3: DOCUMENT Q&A ============ with gr.Tab("📚 Document Q&A", elem_id="doc-tab"): gr.Markdown( """ ### Query Your Documents Ask questions about documents in your vector storage. """ ) with gr.Row(): with gr.Column(): doc_query = gr.Textbox( placeholder="Ask a question about your documents...", label="Query", lines=3 ) with gr.Row(): search_btn = gr.Button("🔍 Search", variant="primary") stream_toggle = gr.Checkbox(label="Stream Response", value=True) doc_response = gr.Textbox( label="Response", lines=15, max_lines=25 ) with gr.Column(scale=1): gr.Markdown("### Vector Store Info") vector_status = gr.Markdown( f"**Store ID:** `{self.assistant.config.vector_store_id or 'Not configured'}`" ) gr.Markdown( """ ### Tips - Be specific in your questions - Reference document sections if known - Ask for citations when needed """ ) # Document Q&A handler async def search_documents(query, stream): if not query: return "Please enter a query." try: if stream: response = "" async for chunk in self.assistant.query_documents(query, stream=True): response += chunk yield response else: response = await self.assistant.query_documents(query, stream=False) yield response except Exception as e: yield f"❌ Error: {str(e)}" search_btn.click( search_documents, [doc_query, stream_toggle], doc_response ) doc_query.submit( search_documents, [doc_query, stream_toggle], doc_response ) # ============ TAB 4: IMAGE ANALYSIS ============ with gr.Tab("🖼️ Vision", elem_id="vision-tab"): gr.Markdown( """ ### Image Analysis with Voice Upload an image and ask questions about it. """ ) with gr.Row(): with gr.Column(): image_input = gr.Image( type="filepath", label="Upload Image" ) image_query = gr.Textbox( placeholder="What would you like to know about this image?", label="Question", lines=2 ) analyze_btn = gr.Button("🔍 Analyze", variant="primary") with gr.Column(): image_response = gr.Textbox( label="Analysis", lines=10 ) image_audio = gr.Audio( label="Voice Response", type="numpy" ) # Image analysis handler async def analyze_image(image, query): if not image or not query: return "Please provide both an image and a question.", None try: text_response = "" audio_chunks = [] async for response in self.assistant.analyze_image(image, query): if response["type"] == ResponseType.TEXT: text_response += response["data"] elif response["type"] == ResponseType.AUDIO: audio_chunks.append(response["data"]) # Process audio if available audio_output = None if audio_chunks: combined = b''.join(audio_chunks) audio_array = np.frombuffer(combined, dtype=np.int16) / 32767.0 audio_output = (24000, audio_array) return text_response, audio_output except Exception as e: return f"❌ Error: {str(e)}", None analyze_btn.click( analyze_image, [image_input, image_query], [image_response, image_audio] ) # ============ TAB 5: SETTINGS ============ with gr.Tab("⚙️ Settings", elem_id="settings-tab"): gr.Markdown("### Model Configuration") with gr.Row(): with gr.Column(): gr.Markdown("#### Model Information") model_info = gr.JSON( value={ "model": self.assistant.config.model_name, "deployment": self.assistant.config.deployment_name, "context_window": 128000, "capabilities": [ "realtime_audio", "streaming_text", "document_qa", "vision", "function_calling" ], "audio_format": self.assistant.config.audio_format, "sample_rate": self.assistant.config.sample_rate }, label="Current Configuration" ) with gr.Column(): gr.Markdown("#### Performance Metrics") metrics = gr.JSON( value={ "voice_latency": "~300ms", "text_streaming": "~50ms/token", "document_search": "1-2 seconds", "image_analysis": "2-3 seconds" }, label="Expected Performance" ) # Footer gr.Markdown( """ ---
**Azure GPT-4o Realtime** | Single Model for All Tasks | [Documentation](https://docs.microsoft.com/azure/cognitive-services/openai/)
""" ) return interface def _get_custom_css(self) -> str: """Get custom CSS for the interface""" return """ #chatbot { height: 500px !important; } .gradio-container { max-width: 1400px !important; margin: auto !important; } #voice-tab .gradio-audio { height: 120px !important; } .markdown-text { font-size: 14px !important; } """ # ========================= AUDIO PROCESSOR ========================= class AudioProcessor: """Handle audio format conversions and processing""" @staticmethod def numpy_to_pcm16(audio_array: np.ndarray) -> bytes: """Convert numpy array to PCM16 bytes""" return (audio_array * 32767).astype(np.int16).tobytes() @staticmethod def pcm16_to_numpy(audio_bytes: bytes) -> np.ndarray: """Convert PCM16 bytes to numpy array""" return np.frombuffer(audio_bytes, dtype=np.int16) / 32767.0 # ========================= MAIN APPLICATION ========================= async def main(): """Main application entry point""" # Load configuration from environment config = UnifiedAssistantConfig( azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_API_VERSION", "2024-10-01-preview"), deployment_name=os.getenv("AZURE_DEPLOYMENT_NAME", "gpt-4o-realtime-preview"), vector_store_id=os.getenv("AZURE_VECTOR_STORE_ID"), voice=os.getenv("AZURE_VOICE", "alloy"), temperature=float(os.getenv("TEMPERATURE", "0.7")), max_tokens=int(os.getenv("MAX_TOKENS", "4096")) ) # Validate configuration if not config.azure_endpoint or not config.api_key: logger.error("❌ Missing required environment variables") raise ValueError( "Please set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY environment variables" ) if not config.vector_store_id: logger.warning("⚠️ No vector store ID configured - document Q&A will be limited") try: # Initialize assistant logger.info("🚀 Initializing GPT-4o Realtime Unified Assistant...") assistant = GPT4oRealtimeAssistant(config) # Create Gradio interface logger.info("🎨 Creating Gradio interface...") interface_manager = UnifiedGradioInterface(assistant) interface = interface_manager.create_interface() # Launch application logger.info("✅ Launching application...") interface.queue(max_size=100) # Enable queuing for concurrent users interface.launch( server_name="0.0.0.0", server_port=7860, share=True, # Create public URL show_error=True, quiet=False ) except KeyboardInterrupt: logger.info("👋 Shutting down gracefully...") if assistant.session_active: await assistant.disconnect() except Exception as e: logger.error(f"❌ Fatal error: {str(e)}") raise def run(): """Synchronous entry point""" asyncio.run(main()) if __name__ == "__main__": run()