NRAI-Kancha-2 / app.py
bibeksah-dev
init
d9fb99c
# azure_unified_assistant.py
"""
Azure OpenAI Unified Assistant with GPT-4o Realtime
Supports: Voice I/O, Streaming Text, Document Q&A, Vision
"""
import os
import json
import asyncio
import base64
import queue
import threading
import time
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple, AsyncGenerator
from dataclasses import dataclass, field
from enum import Enum
import gradio as gr
import numpy as np
import websockets
from openai import AzureOpenAI, AsyncAzureOpenAI
from dotenv import load_dotenv
import logging
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ========================= CONFIGURATION =========================
@dataclass
class UnifiedAssistantConfig:
"""Configuration for GPT-4o Realtime Unified Assistant"""
# Azure Configuration
azure_endpoint: str
api_key: str
api_version: str = "2024-10-01-preview"
# Model Configuration
model_name: str = "gpt-4o-realtime-preview-2024-10-01"
deployment_name: str = "gpt-4o-realtime-preview"
# Vector Storage
vector_store_id: Optional[str] = None
# Audio Configuration
voice: str = "alloy" # alloy, echo, fable, onyx, nova, shimmer
audio_format: str = "pcm16"
sample_rate: int = 24000
# Model Parameters
temperature: float = 0.7
max_tokens: int = 4096
top_p: float = 0.95
# System Configuration
enable_streaming: bool = True
enable_voice: bool = True
enable_vision: bool = True
enable_functions: bool = True
# WebSocket Configuration
ws_timeout: int = 60
ws_max_size: int = 10 * 1024 * 1024 # 10MB
class ResponseType(Enum):
"""Response types from the unified model"""
TEXT = "text"
AUDIO = "audio"
TRANSCRIPT = "transcript"
FUNCTION = "function"
ERROR = "error"
DONE = "done"
# ========================= UNIFIED ASSISTANT =========================
class GPT4oRealtimeAssistant:
"""
Unified Assistant using GPT-4o Realtime Preview
Single model for all capabilities
"""
def __init__(self, config: UnifiedAssistantConfig):
self.config = config
self.websocket = None
self.session_active = False
self.conversation_id = None
# Initialize clients
self._initialize_clients()
# Audio handling
self.audio_queue = queue.Queue()
self.is_recording = False
logger.info(f"βœ… Initialized GPT-4o Realtime Assistant with model: {config.model_name}")
def _initialize_clients(self):
"""Initialize both REST and WebSocket clients"""
try:
# REST client for standard operations
self.rest_client = AzureOpenAI(
azure_endpoint=self.config.azure_endpoint,
api_key=self.config.api_key,
api_version=self.config.api_version
)
# Async REST client for streaming
self.async_client = AsyncAzureOpenAI(
azure_endpoint=self.config.azure_endpoint,
api_key=self.config.api_key,
api_version=self.config.api_version
)
logger.info("βœ… REST clients initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize clients: {str(e)}")
raise
# ==================== REALTIME WEBSOCKET CONNECTION ====================
async def connect_realtime(self):
"""Establish WebSocket connection for realtime interaction"""
if self.session_active:
logger.info("Session already active")
return
try:
# Construct WebSocket URL
ws_endpoint = self.config.azure_endpoint.replace("https://", "wss://")
ws_url = f"{ws_endpoint}/openai/realtime?api-version={self.config.api_version}&deployment={self.config.deployment_name}"
headers = {
"api-key": self.config.api_key,
"OpenAI-Beta": "realtime=v1"
}
# Connect with timeout
self.websocket = await asyncio.wait_for(
websockets.connect(
ws_url,
extra_headers=headers,
max_size=self.config.ws_max_size
),
timeout=self.config.ws_timeout
)
# Configure session
await self._configure_session()
self.session_active = True
self.conversation_id = f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.info(f"βœ… Realtime session connected: {self.conversation_id}")
except asyncio.TimeoutError:
logger.error("❌ WebSocket connection timeout")
raise
except Exception as e:
logger.error(f"❌ Failed to connect: {str(e)}")
raise
async def _configure_session(self):
"""Configure the realtime session with all capabilities"""
session_config = {
"type": "session.update",
"session": {
"model": self.config.model_name,
"modalities": ["text", "audio"],
"voice": self.config.voice,
"instructions": """You are a helpful AI assistant with access to:
1. Realtime voice conversations
2. Document search and analysis
3. Streaming text responses
4. Image understanding
5. Function calling capabilities
Provide clear, accurate, and helpful responses.
For document questions, cite sources when available.
Keep voice responses concise and natural.""",
# Audio Configuration
"input_audio_format": self.config.audio_format,
"output_audio_format": self.config.audio_format,
"input_audio_transcription": {
"model": "whisper-1"
},
# Voice Activity Detection
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500
},
# Tools Configuration
"tools": self._get_tools_config(),
"tool_choice": "auto",
# Model Parameters
"temperature": self.config.temperature,
"max_response_output_tokens": self.config.max_tokens
}
}
await self.websocket.send(json.dumps(session_config))
# Wait for session confirmation
response = await self.websocket.recv()
data = json.loads(response)
if data.get("type") == "session.created":
logger.info("βœ… Session configured successfully")
else:
logger.warning(f"⚠️ Unexpected session response: {data.get('type')}")
def _get_tools_config(self) -> List[Dict]:
"""Get tools configuration based on settings"""
tools = []
if self.config.vector_store_id:
tools.append({
"type": "file_search",
"file_search": {
"vector_store_ids": [self.config.vector_store_id]
}
})
if self.config.enable_functions:
tools.append({
"type": "code_interpreter"
})
return tools
# ==================== AUDIO HANDLING ====================
async def send_audio(self, audio_data: bytes):
"""Send audio input to the model"""
if not self.session_active:
await self.connect_realtime()
try:
# Convert to base64
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
message = {
"type": "input_audio_buffer.append",
"audio": audio_base64
}
await self.websocket.send(json.dumps(message))
# Commit audio buffer to trigger processing
await self.websocket.send(json.dumps({
"type": "input_audio_buffer.commit"
}))
except Exception as e:
logger.error(f"❌ Error sending audio: {str(e)}")
raise
async def process_audio_stream(self, audio_stream: AsyncGenerator[bytes, None]):
"""Process continuous audio stream"""
if not self.session_active:
await self.connect_realtime()
try:
async for audio_chunk in audio_stream:
await self.send_audio(audio_chunk)
except Exception as e:
logger.error(f"❌ Error processing audio stream: {str(e)}")
raise
# ==================== TEXT HANDLING ====================
async def send_text(self, text: str, use_realtime: bool = False):
"""Send text input - can use either realtime or REST API"""
if use_realtime:
# Use realtime WebSocket
if not self.session_active:
await self.connect_realtime()
message = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": text
}
]
}
}
await self.websocket.send(json.dumps(message))
# Trigger response
await self.websocket.send(json.dumps({
"type": "response.create",
"response": {
"modalities": ["text", "audio"] if self.config.enable_voice else ["text"]
}
}))
else:
# Use REST API for text-only
return self.rest_client.chat.completions.create(
model=self.config.deployment_name,
messages=[
{"role": "user", "content": text}
],
stream=self.config.enable_streaming,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
# ==================== STREAMING RESPONSES ====================
async def stream_response(self) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream responses from the realtime connection"""
if not self.session_active:
logger.error("❌ No active session")
return
try:
while True:
message = await asyncio.wait_for(
self.websocket.recv(),
timeout=self.config.ws_timeout
)
data = json.loads(message)
response_type = data.get("type", "")
# Handle different response types
if response_type == "response.audio.delta":
yield {
"type": ResponseType.AUDIO,
"data": base64.b64decode(data.get("delta", ""))
}
elif response_type == "response.text.delta":
yield {
"type": ResponseType.TEXT,
"data": data.get("delta", "")
}
elif response_type == "response.audio_transcript.delta":
yield {
"type": ResponseType.TRANSCRIPT,
"data": data.get("delta", "")
}
elif response_type == "response.function_call_arguments.delta":
yield {
"type": ResponseType.FUNCTION,
"data": json.loads(data.get("delta", "{}"))
}
elif response_type == "response.done":
yield {
"type": ResponseType.DONE,
"data": None
}
break
elif response_type == "error":
yield {
"type": ResponseType.ERROR,
"data": data.get("error", {})
}
break
except asyncio.TimeoutError:
logger.error("❌ Response timeout")
yield {
"type": ResponseType.ERROR,
"data": {"message": "Response timeout"}
}
except Exception as e:
logger.error(f"❌ Stream error: {str(e)}")
yield {
"type": ResponseType.ERROR,
"data": {"message": str(e)}
}
# ==================== DOCUMENT Q&A ====================
async def query_documents(self, query: str, stream: bool = True):
"""Query documents using vector search"""
if not self.config.vector_store_id:
return "No vector store configured. Please set up a vector store ID."
try:
# Create assistant with vector search
assistant = self.rest_client.beta.assistants.create(
model=self.config.deployment_name,
name="Document Q&A Assistant",
instructions="""Answer questions based on the provided documents.
Cite sources when possible.
If information is not in the documents, say so clearly.""",
tools=[{"type": "file_search"}],
tool_resources={
"file_search": {
"vector_store_ids": [self.config.vector_store_id]
}
},
temperature=self.config.temperature
)
# Create thread
thread = self.rest_client.beta.threads.create()
# Add message
self.rest_client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=query
)
# Run with streaming
if stream:
with self.rest_client.beta.threads.runs.create_and_stream(
thread_id=thread.id,
assistant_id=assistant.id,
) as stream:
for event in stream:
if event.event == 'thread.message.delta':
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'content'):
for content in event.data.delta.content:
if content.type == 'text' and hasattr(content.text, 'value'):
yield content.text.value
else:
# Non-streaming response
run = self.rest_client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
# Wait for completion
while run.status in ['queued', 'in_progress']:
time.sleep(1)
run = self.rest_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id
)
if run.status == 'completed':
messages = self.rest_client.beta.threads.messages.list(
thread_id=thread.id
)
if messages.data:
return messages.data[0].content[0].text.value
else:
return f"Query failed: {run.status}"
except Exception as e:
logger.error(f"❌ Document query error: {str(e)}")
return f"Error querying documents: {str(e)}"
# ==================== MULTIMODAL HANDLING ====================
async def analyze_image(self, image_path: str, question: str):
"""Analyze image with text/voice response"""
try:
# Read and encode image
with open(image_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
if self.session_active:
# Use realtime connection for voice response
message = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": question
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}",
"detail": "high"
}
}
]
}
}
await self.websocket.send(json.dumps(message))
# Trigger response
await self.websocket.send(json.dumps({
"type": "response.create"
}))
# Stream response
async for response in self.stream_response():
yield response
else:
# Use REST API
response = self.rest_client.chat.completions.create(
model=self.config.deployment_name,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}",
"detail": "high"
}
}
]
}
],
max_tokens=self.config.max_tokens,
temperature=self.config.temperature
)
yield {
"type": ResponseType.TEXT,
"data": response.choices[0].message.content
}
except Exception as e:
logger.error(f"❌ Image analysis error: {str(e)}")
yield {
"type": ResponseType.ERROR,
"data": {"message": str(e)}
}
# ==================== CLEANUP ====================
async def disconnect(self):
"""Disconnect realtime session"""
if self.websocket:
await self.websocket.close()
self.session_active = False
logger.info("βœ… Disconnected from realtime session")
# ========================= GRADIO INTERFACE =========================
class UnifiedGradioInterface:
"""Enhanced Gradio interface for the unified assistant"""
def __init__(self, assistant: GPT4oRealtimeAssistant):
self.assistant = assistant
self.conversation_history = []
self.audio_processor = AudioProcessor()
def create_interface(self) -> gr.Blocks:
"""Create the complete Gradio interface"""
with gr.Blocks(
theme=gr.themes.Soft(),
title="Azure GPT-4o Realtime Assistant",
css=self._get_custom_css()
) as interface:
# Header
gr.Markdown(
"""
# 🌟 Azure GPT-4o Realtime Assistant
**Unified Model with All Capabilities:**
- πŸŽ™οΈ **Voice:** Realtime voice conversations with ~300ms latency
- ⚑ **Streaming:** Live text streaming responses
- πŸ“š **Documents:** Vector search and Q&A on your documents
- πŸ–ΌοΈ **Vision:** Image analysis and understanding
- πŸ”§ **Functions:** Tool calling and code interpretation
**Model:** `gpt-4o-realtime-preview-2024-10-01` | **Context:** 128K tokens
"""
)
# State management
state = gr.State({
"session_active": False,
"conversation_id": None
})
with gr.Tabs() as tabs:
# ============ TAB 1: STREAMING CHAT ============
with gr.Tab("πŸ’¬ Streaming Chat", elem_id="chat-tab"):
with gr.Row():
with gr.Column(scale=4):
chatbot = gr.Chatbot(
height=500,
show_label=False,
elem_id="chatbot",
bubble_full_width=False
)
with gr.Row():
msg = gr.Textbox(
placeholder="Type your message here... (Press Enter to send)",
label="Message",
scale=4,
lines=2
)
with gr.Column(scale=1):
send_btn = gr.Button("Send", variant="primary", size="lg")
clear_btn = gr.Button("Clear", variant="secondary")
with gr.Column(scale=1):
gr.Markdown("### Settings")
streaming_mode = gr.Radio(
choices=["Stream", "Complete"],
value="Stream",
label="Response Mode"
)
temperature = gr.Slider(
minimum=0,
maximum=1,
value=0.7,
step=0.1,
label="Temperature"
)
gr.Markdown("### Status")
status_display = gr.Markdown("🟒 Ready")
# Chat handlers
async def handle_chat(message, history, mode):
if not message:
return history
history = history or []
history.append([message, ""])
try:
if mode == "Stream":
response = ""
# Stream text response
await self.assistant.send_text(message, use_realtime=True)
async for chunk in self.assistant.stream_response():
if chunk["type"] == ResponseType.TEXT:
response += chunk["data"]
history[-1][1] = response
yield history
elif chunk["type"] == ResponseType.DONE:
break
else:
# Complete response
result = await self.assistant.send_text(message, use_realtime=False)
history[-1][1] = result.choices[0].message.content
yield history
except Exception as e:
history[-1][1] = f"❌ Error: {str(e)}"
yield history
msg.submit(
handle_chat,
[msg, chatbot, streaming_mode],
chatbot
).then(
lambda: "",
outputs=msg
)
send_btn.click(
handle_chat,
[msg, chatbot, streaming_mode],
chatbot
).then(
lambda: "",
outputs=msg
)
clear_btn.click(
lambda: ([], ""),
outputs=[chatbot, msg]
)
# ============ TAB 2: VOICE INTERACTION ============
with gr.Tab("πŸŽ™οΈ Voice Chat", elem_id="voice-tab"):
gr.Markdown(
"""
### Realtime Voice Interaction
Talk naturally with the assistant using your microphone.
"""
)
with gr.Row():
with gr.Column():
# Connection control
with gr.Row():
connect_btn = gr.Button("πŸ”Œ Connect Voice", variant="primary")
disconnect_btn = gr.Button("πŸ”Œ Disconnect", variant="stop")
connection_status = gr.Markdown("βšͺ Not connected")
# Audio interface
audio_input = gr.Audio(
sources=["microphone"],
type="numpy",
streaming=True,
label="🎀 Speak to Assistant"
)
audio_output = gr.Audio(
label="πŸ”Š Assistant Response",
type="numpy",
autoplay=True
)
with gr.Column():
# Conversation display
transcript = gr.Textbox(
label="Conversation Transcript",
lines=20,
max_lines=30,
interactive=False
)
# Voice settings
voice_select = gr.Dropdown(
choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"],
value="alloy",
label="Voice Selection"
)
# Voice handlers
async def connect_voice():
try:
await self.assistant.connect_realtime()
return "🟒 Connected to realtime voice"
except Exception as e:
return f"πŸ”΄ Connection failed: {str(e)}"
async def disconnect_voice():
try:
await self.assistant.disconnect()
return "βšͺ Disconnected"
except Exception as e:
return f"πŸ”΄ Error: {str(e)}"
async def process_voice(audio_data):
if audio_data is None:
return None, ""
try:
# Convert audio format
sample_rate, audio = audio_data
audio_bytes = (audio * 32767).astype(np.int16).tobytes()
# Send audio
await self.assistant.send_audio(audio_bytes)
# Collect responses
transcript_text = ""
audio_chunks = []
async for response in self.assistant.stream_response():
if response["type"] == ResponseType.AUDIO:
audio_chunks.append(response["data"])
elif response["type"] == ResponseType.TRANSCRIPT:
transcript_text += response["data"]
elif response["type"] == ResponseType.DONE:
break
# Process audio output
if audio_chunks:
combined = b''.join(audio_chunks)
audio_array = np.frombuffer(combined, dtype=np.int16) / 32767.0
return (24000, audio_array), transcript_text
return None, transcript_text
except Exception as e:
return None, f"Error: {str(e)}"
connect_btn.click(
connect_voice,
outputs=connection_status
)
disconnect_btn.click(
disconnect_voice,
outputs=connection_status
)
audio_input.stream(
process_voice,
audio_input,
[audio_output, transcript]
)
# ============ TAB 3: DOCUMENT Q&A ============
with gr.Tab("πŸ“š Document Q&A", elem_id="doc-tab"):
gr.Markdown(
"""
### Query Your Documents
Ask questions about documents in your vector storage.
"""
)
with gr.Row():
with gr.Column():
doc_query = gr.Textbox(
placeholder="Ask a question about your documents...",
label="Query",
lines=3
)
with gr.Row():
search_btn = gr.Button("πŸ” Search", variant="primary")
stream_toggle = gr.Checkbox(label="Stream Response", value=True)
doc_response = gr.Textbox(
label="Response",
lines=15,
max_lines=25
)
with gr.Column(scale=1):
gr.Markdown("### Vector Store Info")
vector_status = gr.Markdown(
f"**Store ID:** `{self.assistant.config.vector_store_id or 'Not configured'}`"
)
gr.Markdown(
"""
### Tips
- Be specific in your questions
- Reference document sections if known
- Ask for citations when needed
"""
)
# Document Q&A handler
async def search_documents(query, stream):
if not query:
return "Please enter a query."
try:
if stream:
response = ""
async for chunk in self.assistant.query_documents(query, stream=True):
response += chunk
yield response
else:
response = await self.assistant.query_documents(query, stream=False)
yield response
except Exception as e:
yield f"❌ Error: {str(e)}"
search_btn.click(
search_documents,
[doc_query, stream_toggle],
doc_response
)
doc_query.submit(
search_documents,
[doc_query, stream_toggle],
doc_response
)
# ============ TAB 4: IMAGE ANALYSIS ============
with gr.Tab("πŸ–ΌοΈ Vision", elem_id="vision-tab"):
gr.Markdown(
"""
### Image Analysis with Voice
Upload an image and ask questions about it.
"""
)
with gr.Row():
with gr.Column():
image_input = gr.Image(
type="filepath",
label="Upload Image"
)
image_query = gr.Textbox(
placeholder="What would you like to know about this image?",
label="Question",
lines=2
)
analyze_btn = gr.Button("πŸ” Analyze", variant="primary")
with gr.Column():
image_response = gr.Textbox(
label="Analysis",
lines=10
)
image_audio = gr.Audio(
label="Voice Response",
type="numpy"
)
# Image analysis handler
async def analyze_image(image, query):
if not image or not query:
return "Please provide both an image and a question.", None
try:
text_response = ""
audio_chunks = []
async for response in self.assistant.analyze_image(image, query):
if response["type"] == ResponseType.TEXT:
text_response += response["data"]
elif response["type"] == ResponseType.AUDIO:
audio_chunks.append(response["data"])
# Process audio if available
audio_output = None
if audio_chunks:
combined = b''.join(audio_chunks)
audio_array = np.frombuffer(combined, dtype=np.int16) / 32767.0
audio_output = (24000, audio_array)
return text_response, audio_output
except Exception as e:
return f"❌ Error: {str(e)}", None
analyze_btn.click(
analyze_image,
[image_input, image_query],
[image_response, image_audio]
)
# ============ TAB 5: SETTINGS ============
with gr.Tab("βš™οΈ Settings", elem_id="settings-tab"):
gr.Markdown("### Model Configuration")
with gr.Row():
with gr.Column():
gr.Markdown("#### Model Information")
model_info = gr.JSON(
value={
"model": self.assistant.config.model_name,
"deployment": self.assistant.config.deployment_name,
"context_window": 128000,
"capabilities": [
"realtime_audio",
"streaming_text",
"document_qa",
"vision",
"function_calling"
],
"audio_format": self.assistant.config.audio_format,
"sample_rate": self.assistant.config.sample_rate
},
label="Current Configuration"
)
with gr.Column():
gr.Markdown("#### Performance Metrics")
metrics = gr.JSON(
value={
"voice_latency": "~300ms",
"text_streaming": "~50ms/token",
"document_search": "1-2 seconds",
"image_analysis": "2-3 seconds"
},
label="Expected Performance"
)
# Footer
gr.Markdown(
"""
---
<center>
**Azure GPT-4o Realtime** | Single Model for All Tasks |
[Documentation](https://docs.microsoft.com/azure/cognitive-services/openai/)
</center>
"""
)
return interface
def _get_custom_css(self) -> str:
"""Get custom CSS for the interface"""
return """
#chatbot {
height: 500px !important;
}
.gradio-container {
max-width: 1400px !important;
margin: auto !important;
}
#voice-tab .gradio-audio {
height: 120px !important;
}
.markdown-text {
font-size: 14px !important;
}
"""
# ========================= AUDIO PROCESSOR =========================
class AudioProcessor:
"""Handle audio format conversions and processing"""
@staticmethod
def numpy_to_pcm16(audio_array: np.ndarray) -> bytes:
"""Convert numpy array to PCM16 bytes"""
return (audio_array * 32767).astype(np.int16).tobytes()
@staticmethod
def pcm16_to_numpy(audio_bytes: bytes) -> np.ndarray:
"""Convert PCM16 bytes to numpy array"""
return np.frombuffer(audio_bytes, dtype=np.int16) / 32767.0
# ========================= MAIN APPLICATION =========================
async def main():
"""Main application entry point"""
# Load configuration from environment
config = UnifiedAssistantConfig(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_API_VERSION", "2024-10-01-preview"),
deployment_name=os.getenv("AZURE_DEPLOYMENT_NAME", "gpt-4o-realtime-preview"),
vector_store_id=os.getenv("AZURE_VECTOR_STORE_ID"),
voice=os.getenv("AZURE_VOICE", "alloy"),
temperature=float(os.getenv("TEMPERATURE", "0.7")),
max_tokens=int(os.getenv("MAX_TOKENS", "4096"))
)
# Validate configuration
if not config.azure_endpoint or not config.api_key:
logger.error("❌ Missing required environment variables")
raise ValueError(
"Please set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY environment variables"
)
if not config.vector_store_id:
logger.warning("⚠️ No vector store ID configured - document Q&A will be limited")
try:
# Initialize assistant
logger.info("πŸš€ Initializing GPT-4o Realtime Unified Assistant...")
assistant = GPT4oRealtimeAssistant(config)
# Create Gradio interface
logger.info("🎨 Creating Gradio interface...")
interface_manager = UnifiedGradioInterface(assistant)
interface = interface_manager.create_interface()
# Launch application
logger.info("βœ… Launching application...")
interface.queue(max_size=100) # Enable queuing for concurrent users
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True, # Create public URL
show_error=True,
quiet=False
)
except KeyboardInterrupt:
logger.info("πŸ‘‹ Shutting down gracefully...")
if assistant.session_active:
await assistant.disconnect()
except Exception as e:
logger.error(f"❌ Fatal error: {str(e)}")
raise
def run():
"""Synchronous entry point"""
asyncio.run(main())
if __name__ == "__main__":
run()