Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
|
@@ -17,7 +17,12 @@ from io import BytesIO
|
|
| 17 |
import numpy as np
|
| 18 |
|
| 19 |
# --- CONFIGURATION & INITIALIZATION ---
|
| 20 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 21 |
os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False'
|
| 22 |
AUDIO_DIR = "audio_outputs"
|
| 23 |
DOC_DIR = "doc_outputs"
|
|
@@ -25,20 +30,22 @@ if not os.path.exists(AUDIO_DIR):
|
|
| 25 |
os.makedirs(AUDIO_DIR)
|
| 26 |
if not os.path.exists(DOC_DIR):
|
| 27 |
os.makedirs(DOC_DIR)
|
|
|
|
| 28 |
REPO_ID = "cosmosai471/Luna-v3"
|
| 29 |
MODEL_FILE = "luna.gguf"
|
| 30 |
LOCAL_MODEL_PATH = MODEL_FILE
|
|
|
|
|
|
|
| 31 |
SYSTEM_PROMPT = (
|
| 32 |
-
"You are Luna, a helpful and friendly AI assistant.
|
| 33 |
-
"
|
| 34 |
-
"Your full response must follow these tags."
|
| 35 |
)
|
| 36 |
|
| 37 |
# --- TUNABLES / GUARDS ---
|
| 38 |
-
CONFIDENCE_THRESHOLD = 30 #
|
| 39 |
-
STREAM_CHAR_LIMIT = 35000 #
|
| 40 |
-
STREAM_ITER_LIMIT = 20000 #
|
| 41 |
-
MIN_MEANINGFUL_LENGTH = 20 #
|
| 42 |
|
| 43 |
def safe_del(self):
|
| 44 |
try:
|
|
@@ -69,71 +76,107 @@ except Exception as e:
|
|
| 69 |
print(f"❌ Error loading Luna model: {e}")
|
| 70 |
class DummyLLM:
|
| 71 |
def create_completion(self, *args, **kwargs):
|
| 72 |
-
# yield one piece to mimic streaming
|
| 73 |
yield {'choices': [{'text': '[Intent: qa_general][Confidence: 0] ERROR: Luna model failed to load. Check logs and resources.'}]}
|
| 74 |
llm = DummyLLM()
|
| 75 |
|
|
|
|
| 76 |
stt_pipe = None
|
| 77 |
try:
|
| 78 |
-
stt_pipe = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=
|
| 79 |
-
print(f"✅ Loaded Whisper-base on device: {
|
| 80 |
except Exception as e:
|
| 81 |
print(f"⚠️ Could not load Whisper. Voice chat disabled. Error: {e}")
|
| 82 |
|
| 83 |
image_pipe = None
|
| 84 |
try:
|
| 85 |
VLM_MODEL_ID = "llava-hf/llava-1.5-7b-hf"
|
| 86 |
-
image_pipe = pipeline("image-to-text", model=VLM_MODEL_ID, device=
|
| 87 |
-
print(f"✅ Loaded {VLM_MODEL_ID} for image processing.")
|
| 88 |
except Exception as e:
|
| 89 |
print(f"⚠️ Could not load VLM ({VLM_MODEL_ID}). Image chat disabled. Error: {e}")
|
| 90 |
|
| 91 |
img_gen_pipe = None
|
| 92 |
try:
|
| 93 |
img_gen_pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float32)
|
| 94 |
-
img_gen_pipe.to(
|
| 95 |
-
print("✅ Loaded Stable Diffusion
|
| 96 |
except Exception as e:
|
| 97 |
print(f"⚠️ Could not load Image Generation pipeline. Image generation disabled. Error: {e}")
|
| 98 |
|
| 99 |
|
| 100 |
-
# ---
|
| 101 |
|
| 102 |
def simulate_recording_delay():
|
| 103 |
time.sleep(3)
|
| 104 |
return None
|
| 105 |
|
| 106 |
-
def
|
| 107 |
-
"""Remove
|
| 108 |
-
This ensures tags never leak into the UI.
|
| 109 |
-
"""
|
| 110 |
if not text:
|
| 111 |
return ""
|
| 112 |
-
# remove bracketed tags like [Intent: xyz] [Confidence: 85]
|
| 113 |
text = re.sub(r'\[Intent:\s*[\w\-\_]+\]', '', text, flags=re.IGNORECASE)
|
| 114 |
text = re.sub(r'\[Confidence:\s*\d{1,3}\]', '', text, flags=re.IGNORECASE)
|
| 115 |
-
|
| 116 |
-
text = re.sub(r'(?im)^\s*Intent:\s*.*$', '', text)
|
| 117 |
-
text = re.sub(r'(?im)^\s*Confidence:\s*.*$', '', text)
|
| 118 |
-
# collapse multiple blank lines
|
| 119 |
-
text = re.sub(r'\n\s*\n+', '\n\n', text).strip()
|
| 120 |
return text
|
| 121 |
|
| 122 |
-
def
|
| 123 |
-
"""
|
| 124 |
-
if not
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 125 |
return ""
|
| 126 |
-
#
|
| 127 |
-
|
| 128 |
-
#
|
| 129 |
-
|
| 130 |
-
|
| 131 |
-
|
| 132 |
-
#
|
| 133 |
-
|
| 134 |
-
|
| 135 |
-
|
| 136 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 137 |
|
| 138 |
def web_search_tool(query: str) -> str:
|
| 139 |
time.sleep(1.2)
|
|
@@ -141,11 +184,13 @@ def web_search_tool(query: str) -> str:
|
|
| 141 |
return f"\n\n🌐 **Web Search Results for '{query}':** I found supplemental info to help answer this."
|
| 142 |
|
| 143 |
def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str:
|
| 144 |
-
"""
|
| 145 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 146 |
confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE)
|
| 147 |
-
cleaned_response = clean_response_stream(raw_response_with_tags)
|
| 148 |
-
|
| 149 |
if confidence_match:
|
| 150 |
try:
|
| 151 |
confidence_score = int(confidence_match.group(1))
|
|
@@ -153,35 +198,37 @@ def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> st
|
|
| 153 |
except Exception:
|
| 154 |
confidence_score = 0
|
| 155 |
else:
|
| 156 |
-
# heuristic
|
| 157 |
-
|
|
|
|
| 158 |
|
|
|
|
| 159 |
if confidence_score < CONFIDENCE_THRESHOLD:
|
| 160 |
-
|
| 161 |
-
|
| 162 |
-
|
| 163 |
-
if
|
| 164 |
-
|
| 165 |
else:
|
| 166 |
-
|
| 167 |
else:
|
| 168 |
-
|
| 169 |
|
| 170 |
-
#
|
| 171 |
-
|
| 172 |
-
return
|
| 173 |
|
| 174 |
-
|
| 175 |
-
"""Perform VQA via the image_pipe. Robust to different pipeline return types.
|
| 176 |
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
|
|
|
| 180 |
"""
|
| 181 |
global image_pipe
|
| 182 |
success = False
|
| 183 |
if image_pipe is None:
|
| 184 |
-
return f"[Image Processing Error: VLM model not loaded.] **User Query:** {message}",
|
| 185 |
|
| 186 |
image = None
|
| 187 |
try:
|
|
@@ -190,22 +237,21 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
|
| 190 |
elif isinstance(image_data_or_path, np.ndarray):
|
| 191 |
image = Image.fromarray(image_data_or_path).convert("RGB")
|
| 192 |
else:
|
| 193 |
-
#
|
| 194 |
try:
|
| 195 |
image = Image.open(BytesIO(image_data_or_path)).convert("RGB")
|
| 196 |
except Exception:
|
| 197 |
image = None
|
| 198 |
|
| 199 |
if image is None:
|
| 200 |
-
return f"[Image Processing Error: Could not open image.] **User Query:** {message}",
|
| 201 |
|
| 202 |
vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:"
|
| 203 |
-
#
|
| 204 |
results = None
|
| 205 |
try:
|
| 206 |
results = image_pipe(image, prompt=vqa_prompt)
|
| 207 |
except TypeError:
|
| 208 |
-
# fallback signature
|
| 209 |
try:
|
| 210 |
results = image_pipe(image)
|
| 211 |
except Exception as e:
|
|
@@ -215,52 +261,50 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
|
| 215 |
print(f"Image pipeline call error: {e}")
|
| 216 |
results = None
|
| 217 |
|
| 218 |
-
|
| 219 |
if results is None:
|
| 220 |
-
|
| 221 |
elif isinstance(results, dict):
|
| 222 |
-
|
| 223 |
-
raw_vlm_output = results.get('generated_text') or results.get('text') or ""
|
| 224 |
elif isinstance(results, list):
|
| 225 |
-
# list of dicts or strings
|
| 226 |
first = results[0]
|
| 227 |
if isinstance(first, dict):
|
| 228 |
-
|
| 229 |
elif isinstance(first, str):
|
| 230 |
-
|
| 231 |
elif isinstance(results, str):
|
| 232 |
-
|
| 233 |
else:
|
| 234 |
-
# unknown shape -> convert to string safe
|
| 235 |
try:
|
| 236 |
-
|
| 237 |
except Exception:
|
| 238 |
-
|
| 239 |
|
| 240 |
-
#
|
| 241 |
-
vqa_response =
|
|
|
|
| 242 |
|
| 243 |
-
# If no meaningful vqa_response, return a helpful fallback message
|
| 244 |
if not vqa_response or len(vqa_response) < 10:
|
| 245 |
vqa_response = (
|
| 246 |
-
"VQA analysis
|
| 247 |
-
"Please
|
| 248 |
)
|
| 249 |
success = False
|
| 250 |
else:
|
| 251 |
success = True
|
| 252 |
|
| 253 |
-
#
|
| 254 |
-
|
| 255 |
-
prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {message}"
|
| 256 |
return prompt_injection, success
|
| 257 |
|
| 258 |
except Exception as e:
|
| 259 |
-
print(f"Image
|
| 260 |
-
return f"[Image Processing Error: {e}] **User Query:** {message}",
|
|
|
|
|
|
|
| 261 |
|
| 262 |
def transcribe_audio(audio_file_path: str) -> Tuple[str, str, gr.update, gr.update, bool, gr.update]:
|
| 263 |
-
if stt_pipe is None or audio_file_path
|
| 264 |
error_msg = "Error: Whisper model failed to load or no audio recorded."
|
| 265 |
return "", error_msg, gr.update(interactive=True), gr.update(value="↑", interactive=True, elem_classes=["circle-btn", "send-mode"]), False, gr.update(visible=False)
|
| 266 |
try:
|
|
@@ -293,6 +337,7 @@ def text_to_audio(text: str, is_voice_chat: bool) -> str or None:
|
|
| 293 |
return None
|
| 294 |
return None
|
| 295 |
|
|
|
|
| 296 |
INTENT_STATUS_MAP = {
|
| 297 |
"code_generate": "Analyzing requirements and drafting code 💻...",
|
| 298 |
"code_explain": "Reviewing code logic and writing explanation 💡...",
|
|
@@ -308,21 +353,22 @@ INTENT_STATUS_MAP = {
|
|
| 308 |
}
|
| 309 |
|
| 310 |
def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]:
|
| 311 |
-
"""
|
| 312 |
-
|
|
|
|
| 313 |
"""
|
| 314 |
intent_match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE)
|
| 315 |
intent = intent_match.group(1).lower() if intent_match else "default"
|
| 316 |
if is_vqa_flow:
|
| 317 |
intent = "vqa"
|
| 318 |
|
| 319 |
-
#
|
| 320 |
-
cleaned_text =
|
| 321 |
status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"])
|
| 322 |
return intent, status, cleaned_text
|
| 323 |
|
|
|
|
| 324 |
def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str):
|
| 325 |
-
"""Generates a file and writes it to disk. If content insufficient, asks for clarification."""
|
| 326 |
file_path = None
|
| 327 |
try:
|
| 328 |
if not content or len(content.strip()) < MIN_MEANINGFUL_LENGTH:
|
|
@@ -370,15 +416,14 @@ def generate_file_content(content: str, history: List[Dict[str, str]], file_type
|
|
| 370 |
file_path = None
|
| 371 |
return history, file_path
|
| 372 |
|
| 373 |
-
|
| 374 |
# --- CORE GENERATOR FUNCTION ---
|
| 375 |
def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any:
|
| 376 |
"""
|
| 377 |
-
-
|
| 378 |
-
-
|
| 379 |
-
-
|
| 380 |
"""
|
| 381 |
-
#
|
| 382 |
if not history or history[-1]['role'] != 'user':
|
| 383 |
yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 384 |
return
|
|
@@ -386,10 +431,10 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 386 |
last_user_index = len(history) - 1
|
| 387 |
original_message = history[last_user_index]['content'] or ""
|
| 388 |
|
| 389 |
-
#
|
| 390 |
is_vqa_flow = False
|
| 391 |
if isinstance(image_input_data, str):
|
| 392 |
-
is_vqa_flow = image_input_data
|
| 393 |
elif isinstance(image_input_data, np.ndarray):
|
| 394 |
is_vqa_flow = image_input_data.size > 0
|
| 395 |
else:
|
|
@@ -399,29 +444,28 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 399 |
llm_input_message = original_message
|
| 400 |
if is_vqa_flow:
|
| 401 |
processed_message, vqa_success = process_image(image_input_data, original_message)
|
| 402 |
-
#
|
| 403 |
-
history[last_user_index]['content'] = f"[IMAGE RECEIVED] {original_message}"
|
| 404 |
llm_input_message = processed_message
|
| 405 |
|
| 406 |
-
#
|
| 407 |
prompt = f"SYSTEM: {SYSTEM_PROMPT}\n"
|
| 408 |
for item in history[:-1]:
|
| 409 |
role = item['role'].upper()
|
| 410 |
content = item['content'] or ""
|
| 411 |
if role == "ASSISTANT":
|
| 412 |
-
# ensure assistant content used in prompt still includes tags if model expects them (we don't alter)
|
| 413 |
prompt += f"LUNA: {content}\n"
|
| 414 |
elif role == "USER":
|
| 415 |
prompt += f"USER: {content}\n"
|
| 416 |
prompt += f"USER: {llm_input_message}\nLUNA: "
|
| 417 |
|
| 418 |
-
#
|
| 419 |
assistant_initial_text = "✨ Luna is starting to think..."
|
| 420 |
history.append({"role": "assistant", "content": assistant_initial_text})
|
| 421 |
|
| 422 |
-
#
|
| 423 |
yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 424 |
-
time.sleep(0.
|
| 425 |
|
| 426 |
full_response = ""
|
| 427 |
current_intent = "default"
|
|
@@ -434,19 +478,18 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 434 |
echo=False, stream=True, temperature=0.7
|
| 435 |
)
|
| 436 |
except Exception as e:
|
| 437 |
-
|
| 438 |
-
history[-1]['content'] =
|
| 439 |
-
yield history, False,
|
| 440 |
return
|
| 441 |
|
| 442 |
-
#
|
| 443 |
try:
|
| 444 |
for output in stream:
|
| 445 |
iter_count += 1
|
| 446 |
if iter_count > STREAM_ITER_LIMIT:
|
| 447 |
-
|
| 448 |
-
|
| 449 |
-
print("Stream aborted: iteration limit reached.")
|
| 450 |
break
|
| 451 |
|
| 452 |
token = output["choices"][0].get("text", "")
|
|
@@ -454,41 +497,36 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 454 |
token = str(token)
|
| 455 |
full_response += token
|
| 456 |
|
| 457 |
-
# safety cap on characters to prevent runaway printing
|
| 458 |
if len(full_response) > STREAM_CHAR_LIMIT:
|
| 459 |
-
full_response = full_response[:STREAM_CHAR_LIMIT] + "\n\n[Truncated:
|
| 460 |
-
print("Stream truncated
|
| 461 |
break
|
| 462 |
|
| 463 |
-
#
|
| 464 |
-
current_intent, current_hint,
|
| 465 |
-
|
| 466 |
-
|
| 467 |
-
|
| 468 |
-
|
| 469 |
-
|
| 470 |
-
cleaned_display = "✨ Luna is forming a reply..."
|
| 471 |
-
history[-1]['content'] = cleaned_display
|
| 472 |
|
| 473 |
yield history, stop_signal, current_hint, gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 474 |
|
| 475 |
except Exception as e:
|
| 476 |
-
|
| 477 |
-
|
| 478 |
-
|
| 479 |
-
history[-1]['content'] = final_clean
|
| 480 |
yield history, False, f"⚠️ Streaming interrupted: {e}", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update()
|
| 481 |
return
|
| 482 |
|
| 483 |
-
#
|
| 484 |
file_download_path = None
|
| 485 |
_, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 486 |
-
content_for_tool =
|
| 487 |
|
| 488 |
-
# Handle tool intents, but require sufficient content; otherwise ask for clarification
|
| 489 |
if current_intent == "image_generate":
|
| 490 |
if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
|
| 491 |
-
history[-1]['content'] = "I detected an image generation
|
| 492 |
else:
|
| 493 |
history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
|
| 494 |
yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
|
@@ -504,47 +542,41 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 504 |
|
| 505 |
elif current_intent == "ppt_generate":
|
| 506 |
if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
|
| 507 |
-
history[-1]['content'] = "I can make a presentation — please give a title and 3–5 bullet points."
|
| 508 |
else:
|
| 509 |
history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
|
| 510 |
yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 511 |
history, file_download_path = generate_file_content(content_for_tool, history, "ppt")
|
| 512 |
|
| 513 |
elif current_intent == "open_google":
|
| 514 |
-
|
| 515 |
-
history[-1]['content'] =
|
| 516 |
|
| 517 |
elif current_intent == "open_camera":
|
| 518 |
-
|
| 519 |
-
history[-1]['content'] =
|
| 520 |
|
| 521 |
else:
|
| 522 |
-
#
|
| 523 |
final_response_content = check_confidence_and_augment(full_response, original_message)
|
| 524 |
history[-1]['content'] = final_response_content
|
| 525 |
|
| 526 |
-
#
|
| 527 |
if not history[-1]['content'] or not str(history[-1]['content']).strip():
|
| 528 |
-
history[-1]['content'] = "Sorry — I couldn't produce a
|
| 529 |
|
| 530 |
-
# convert to audio if requested
|
| 531 |
audio_file_path = text_to_audio(history[-1]['content'], is_voice_chat)
|
| 532 |
|
| 533 |
hint = "✅ Response generated."
|
| 534 |
yield history, False, hint, gr.update(interactive=True), gr.update(value="↑", interactive=True), audio_file_path, False, gr.update(visible=True), gr.update(value=None), gr.update(), file_download_path
|
| 535 |
|
| 536 |
-
|
| 537 |
-
# --- GRADIO WRAPPERS FOR UI ACTIONS ---
|
| 538 |
|
| 539 |
def toggle_menu(current_visibility: bool) -> Tuple[bool, gr.update, gr.update, gr.update]:
|
| 540 |
new_visibility = not current_visibility
|
| 541 |
return new_visibility, gr.update(visible=new_visibility), gr.update(visible=False), gr.update(value="⬇️" if new_visibility else "➕")
|
| 542 |
|
| 543 |
def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]:
|
| 544 |
-
"""
|
| 545 |
-
Appends only the USER message to chat_history. Assistant entry is appended inside chat_generator
|
| 546 |
-
once generation starts (avoids empty assistant box).
|
| 547 |
-
"""
|
| 548 |
has_text = bool(user_message and user_message.strip())
|
| 549 |
has_image = False
|
| 550 |
if isinstance(staged_image_input, str):
|
|
@@ -557,13 +589,12 @@ def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_imag
|
|
| 557 |
if not has_text and not has_image:
|
| 558 |
return user_message, chat_history
|
| 559 |
|
| 560 |
-
#
|
| 561 |
if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower():
|
| 562 |
return user_message, chat_history
|
| 563 |
|
| 564 |
user_message_to_add = "Analyzing Staged Media." if (not has_text and has_image) else user_message.strip()
|
| 565 |
-
chat_history.append({"role": "user", "content": user_message_to_add})
|
| 566 |
-
# DO NOT append assistant here
|
| 567 |
return "", chat_history
|
| 568 |
|
| 569 |
def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
|
|
@@ -586,7 +617,7 @@ def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str
|
|
| 586 |
return history, "Error: Could not find query.", gr.update(visible=False)
|
| 587 |
web_results = web_search_tool(last_user_prompt)
|
| 588 |
new_history = list(history)
|
| 589 |
-
new_history[-1]['content'] += web_results
|
| 590 |
return new_history, "✅ Double-checked with web facts.", gr.update(visible=False)
|
| 591 |
|
| 592 |
def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]], Any, gr.update, gr.update, gr.update, gr.update, gr.update]:
|
|
@@ -595,15 +626,14 @@ def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], s
|
|
| 595 |
chat_history[-1]['content'] = "📸 Preparing camera capture..."
|
| 596 |
return "", chat_history, staged_image_input, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value="📸 Capturing in 3 seconds...", interactive=False), gr.update(value="➕")
|
| 597 |
|
| 598 |
-
|
| 599 |
-
# --- GRADIO INTERFACE ---
|
| 600 |
with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
|
| 601 |
stop_signal = gr.State(value=False)
|
| 602 |
is_voice_chat = gr.State(value=False)
|
| 603 |
staged_image = gr.State(value=None)
|
| 604 |
menu_visible_state = gr.State(value=False)
|
| 605 |
|
| 606 |
-
gr.HTML("<h1 style='text-align: center; color: #4B0082;'
|
| 607 |
|
| 608 |
hint_box = gr.Textbox(value="Ask anything", lines=1, show_label=False, interactive=False, placeholder="Luna's Action...", visible=True)
|
| 609 |
file_download_output = gr.File(label="Generated File", visible=False)
|
|
@@ -635,17 +665,13 @@ with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
|
|
| 635 |
|
| 636 |
output_components = [chatbot, stop_signal, hint_box, txt, combined_btn, audio_output, is_voice_chat, fact_check_btn_row, staged_image, file_input, file_download_output]
|
| 637 |
|
| 638 |
-
#
|
| 639 |
-
btn_menu.click(
|
| 640 |
-
fn=toggle_menu, inputs=[menu_visible_state], outputs=[menu_visible_state, menu_options_row, fact_check_btn_row, btn_menu], queue=False
|
| 641 |
-
)
|
| 642 |
|
| 643 |
def prepare_file_upload(): return gr.update(visible=False), gr.update(value="➕"), gr.update(visible=False), gr.update(interactive=True), gr.update(value="")
|
| 644 |
btn_add_files.click(fn=prepare_file_upload, inputs=[], outputs=[menu_options_row, btn_menu, fact_check_btn_row, file_input, txt], queue=False)
|
| 645 |
|
| 646 |
-
file_input.change(
|
| 647 |
-
fn=stage_file_upload, inputs=[file_input], outputs=[staged_image, hint_box, txt, file_input], queue=False
|
| 648 |
-
)
|
| 649 |
|
| 650 |
btn_take_photo.click(
|
| 651 |
fn=lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "📸 Camera Active. Capture an image.", gr.update(value="➕")),
|
|
@@ -677,24 +703,14 @@ with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
|
|
| 677 |
|
| 678 |
generator_inputs = [txt, staged_image, chatbot, stop_signal, is_voice_chat]
|
| 679 |
|
| 680 |
-
txt.submit(
|
| 681 |
-
fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False
|
| 682 |
-
).then(
|
| 683 |
fn=chat_generator, inputs=generator_inputs, outputs=output_components, queue=True
|
| 684 |
-
).then(
|
| 685 |
-
fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False
|
| 686 |
-
)
|
| 687 |
|
| 688 |
-
combined_btn.click(
|
| 689 |
-
fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False
|
| 690 |
-
).then(
|
| 691 |
fn=chat_generator, inputs=generator_inputs, outputs=output_components, queue=True
|
| 692 |
-
).then(
|
| 693 |
-
fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False
|
| 694 |
-
)
|
| 695 |
|
| 696 |
-
btn_fact_check.click(
|
| 697 |
-
fn=manual_fact_check, inputs=[chatbot], outputs=[chatbot, hint_box, fact_check_btn_row], queue=True
|
| 698 |
-
)
|
| 699 |
|
| 700 |
demo.queue(max_size=20).launch(server_name="0.0.0.0")
|
|
|
|
| 17 |
import numpy as np
|
| 18 |
|
| 19 |
# --- CONFIGURATION & INITIALIZATION ---
|
| 20 |
+
# Use string 'cpu' or GPU index string/int like '0'
|
| 21 |
+
USER_DEVICE = "cpu" # keep as "cpu" on CPU-only hosts; change to "0" for GPU 0
|
| 22 |
+
# convert to values pipelines expect
|
| 23 |
+
PIPELINE_DEVICE = -1 if str(USER_DEVICE).lower() == "cpu" else int(USER_DEVICE)
|
| 24 |
+
TORCH_DEVICE = torch.device("cuda") if torch.cuda.is_available() and PIPELINE_DEVICE != -1 else torch.device("cpu")
|
| 25 |
+
|
| 26 |
os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False'
|
| 27 |
AUDIO_DIR = "audio_outputs"
|
| 28 |
DOC_DIR = "doc_outputs"
|
|
|
|
| 30 |
os.makedirs(AUDIO_DIR)
|
| 31 |
if not os.path.exists(DOC_DIR):
|
| 32 |
os.makedirs(DOC_DIR)
|
| 33 |
+
|
| 34 |
REPO_ID = "cosmosai471/Luna-v3"
|
| 35 |
MODEL_FILE = "luna.gguf"
|
| 36 |
LOCAL_MODEL_PATH = MODEL_FILE
|
| 37 |
+
|
| 38 |
+
# Note: model is expected to prefix outputs with [Intent: ...][Confidence: ...] but user wants those strictly hidden.
|
| 39 |
SYSTEM_PROMPT = (
|
| 40 |
+
"You are Luna, a helpful and friendly AI assistant. For internal tracing you may place Intent/Confidence tags, "
|
| 41 |
+
"but DO NOT expose these tags in the user-facing response. Any Intent/Confidence/Action metadata must be kept internal."
|
|
|
|
| 42 |
)
|
| 43 |
|
| 44 |
# --- TUNABLES / GUARDS ---
|
| 45 |
+
CONFIDENCE_THRESHOLD = 30 # trigger web-search fallback only under this confidence
|
| 46 |
+
STREAM_CHAR_LIMIT = 35000 # cap streaming characters
|
| 47 |
+
STREAM_ITER_LIMIT = 20000 # cap streaming iterations
|
| 48 |
+
MIN_MEANINGFUL_LENGTH = 20 # min length for file-generation prompts
|
| 49 |
|
| 50 |
def safe_del(self):
|
| 51 |
try:
|
|
|
|
| 76 |
print(f"❌ Error loading Luna model: {e}")
|
| 77 |
class DummyLLM:
|
| 78 |
def create_completion(self, *args, **kwargs):
|
|
|
|
| 79 |
yield {'choices': [{'text': '[Intent: qa_general][Confidence: 0] ERROR: Luna model failed to load. Check logs and resources.'}]}
|
| 80 |
llm = DummyLLM()
|
| 81 |
|
| 82 |
+
# transformer's pipeline expects device int: -1 for CPU
|
| 83 |
stt_pipe = None
|
| 84 |
try:
|
| 85 |
+
stt_pipe = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=PIPELINE_DEVICE)
|
| 86 |
+
print(f"✅ Loaded Whisper-base on device: {USER_DEVICE}")
|
| 87 |
except Exception as e:
|
| 88 |
print(f"⚠️ Could not load Whisper. Voice chat disabled. Error: {e}")
|
| 89 |
|
| 90 |
image_pipe = None
|
| 91 |
try:
|
| 92 |
VLM_MODEL_ID = "llava-hf/llava-1.5-7b-hf"
|
| 93 |
+
image_pipe = pipeline("image-to-text", model=VLM_MODEL_ID, device=PIPELINE_DEVICE)
|
| 94 |
+
print(f"✅ Loaded {VLM_MODEL_ID} for image processing (device={USER_DEVICE}).")
|
| 95 |
except Exception as e:
|
| 96 |
print(f"⚠️ Could not load VLM ({VLM_MODEL_ID}). Image chat disabled. Error: {e}")
|
| 97 |
|
| 98 |
img_gen_pipe = None
|
| 99 |
try:
|
| 100 |
img_gen_pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float32)
|
| 101 |
+
img_gen_pipe.to(TORCH_DEVICE)
|
| 102 |
+
print(f"✅ Loaded Stable Diffusion and moved to {TORCH_DEVICE}.")
|
| 103 |
except Exception as e:
|
| 104 |
print(f"⚠️ Could not load Image Generation pipeline. Image generation disabled. Error: {e}")
|
| 105 |
|
| 106 |
|
| 107 |
+
# --- STRICT SANITIZERS & UTILITIES ---
|
| 108 |
|
| 109 |
def simulate_recording_delay():
|
| 110 |
time.sleep(3)
|
| 111 |
return None
|
| 112 |
|
| 113 |
+
def remove_bracketed_tags(text: str) -> str:
|
| 114 |
+
"""Remove bracketed tags like [Intent: ...] [Confidence: ...] exactly."""
|
|
|
|
|
|
|
| 115 |
if not text:
|
| 116 |
return ""
|
|
|
|
| 117 |
text = re.sub(r'\[Intent:\s*[\w\-\_]+\]', '', text, flags=re.IGNORECASE)
|
| 118 |
text = re.sub(r'\[Confidence:\s*\d{1,3}\]', '', text, flags=re.IGNORECASE)
|
| 119 |
+
text = re.sub(r'\[Action:\s*[^\]]+\]', '', text, flags=re.IGNORECASE)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 120 |
return text
|
| 121 |
|
| 122 |
+
def remove_plain_labels_lines(text: str) -> str:
|
| 123 |
+
"""Remove plain lines or labels like 'Intent : ...', 'Confidence: 99', 'Action: ...'."""
|
| 124 |
+
if not text:
|
| 125 |
+
return ""
|
| 126 |
+
text = re.sub(r'(?im)^\s*Intent\s*[:\-]\s*.*$', '', text)
|
| 127 |
+
text = re.sub(r'(?im)^\s*Confidence\s*[:\-]\s*.*$', '', text)
|
| 128 |
+
text = re.sub(r'(?im)^\s*Action\s*[:\-]\s*.*$', '', text)
|
| 129 |
+
return text
|
| 130 |
+
|
| 131 |
+
def remove_word_number_patterns(text: str) -> str:
|
| 132 |
+
"""
|
| 133 |
+
Remove patterns like 'greeting 99 2. goodbye 99' or 'greeting 99' or 'label 100 0'.
|
| 134 |
+
This aggressively strips sequences of word tokens followed by small-digit numbers.
|
| 135 |
+
"""
|
| 136 |
+
if not text:
|
| 137 |
return ""
|
| 138 |
+
# Pattern: word (letters, underscore, hyphen) followed by one or more numbers (1-3 digits),
|
| 139 |
+
# possibly repeated and possibly followed by punctuation.
|
| 140 |
+
# We will repeatedly remove such occurrences until nothing matches to avoid chained dumps.
|
| 141 |
+
pattern = re.compile(r'\b[a-zA-Z_\-]{2,40}\b(?:\s+\d{1,3}\b)+', flags=re.IGNORECASE)
|
| 142 |
+
prev = None
|
| 143 |
+
new = text
|
| 144 |
+
# iterative removal to handle multiple occurrences
|
| 145 |
+
while prev != new:
|
| 146 |
+
prev = new
|
| 147 |
+
new = pattern.sub('', new)
|
| 148 |
+
# also remove isolated numeric sequences that remain on their own
|
| 149 |
+
new = re.sub(r'\b\d{1,3}(?:\s+\d{1,3})*\b', '', new)
|
| 150 |
+
return new
|
| 151 |
+
|
| 152 |
+
def collapse_whitespace_and_punct(text: str) -> str:
|
| 153 |
+
if not text:
|
| 154 |
+
return ""
|
| 155 |
+
# collapse multiple newlines and trim whitespace
|
| 156 |
+
text = re.sub(r'\n\s*\n+', '\n\n', text)
|
| 157 |
+
# remove excessive spaces
|
| 158 |
+
text = re.sub(r'[ \t]{2,}', ' ', text)
|
| 159 |
+
# trim leading/trailing
|
| 160 |
+
return text.strip()
|
| 161 |
+
|
| 162 |
+
def strict_sanitize_for_ui(raw: str) -> str:
|
| 163 |
+
"""
|
| 164 |
+
The final strict sanitizer that ensures NOTHING resembling Intent/Confidence/Action/
|
| 165 |
+
word-number dumps reaches the UI.
|
| 166 |
+
"""
|
| 167 |
+
if not raw:
|
| 168 |
+
return ""
|
| 169 |
+
s = raw
|
| 170 |
+
s = remove_bracketed_tags(s)
|
| 171 |
+
s = remove_plain_labels_lines(s)
|
| 172 |
+
s = remove_word_number_patterns(s)
|
| 173 |
+
s = collapse_whitespace_and_punct(s)
|
| 174 |
+
# final guard to remove leftover tokens like 'Intent' or 'Confidence' anywhere
|
| 175 |
+
s = re.sub(r'(?i)\bIntent\b', '', s)
|
| 176 |
+
s = re.sub(r'(?i)\bConfidence\b', '', s)
|
| 177 |
+
s = re.sub(r'(?i)\bAction\b', '', s)
|
| 178 |
+
s = collapse_whitespace_and_punct(s)
|
| 179 |
+
return s.strip()
|
| 180 |
|
| 181 |
def web_search_tool(query: str) -> str:
|
| 182 |
time.sleep(1.2)
|
|
|
|
| 184 |
return f"\n\n🌐 **Web Search Results for '{query}':** I found supplemental info to help answer this."
|
| 185 |
|
| 186 |
def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str:
|
| 187 |
+
"""
|
| 188 |
+
Internal: parse confidence if present (for logic only), but never display it. If fallback triggered,
|
| 189 |
+
append web results to sanitized response.
|
| 190 |
+
"""
|
| 191 |
+
cleaned_for_logic = remove_bracketed_tags(raw_response_with_tags) # keep for length heuristic
|
| 192 |
+
# extract confidence if any (internal only)
|
| 193 |
confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE)
|
|
|
|
|
|
|
| 194 |
if confidence_match:
|
| 195 |
try:
|
| 196 |
confidence_score = int(confidence_match.group(1))
|
|
|
|
| 198 |
except Exception:
|
| 199 |
confidence_score = 0
|
| 200 |
else:
|
| 201 |
+
# heuristic
|
| 202 |
+
cleaned_no_tags = strict_sanitize_for_ui(cleaned_for_logic)
|
| 203 |
+
confidence_score = 10 if not cleaned_no_tags or len(cleaned_no_tags) < 30 else 85
|
| 204 |
|
| 205 |
+
# If low, augment with web snippet; final output is always sanitized
|
| 206 |
if confidence_score < CONFIDENCE_THRESHOLD:
|
| 207 |
+
print(f"[internal] Low confidence ({confidence_score}%) detected -> using web fallback")
|
| 208 |
+
supplement = web_search_tool(prompt)
|
| 209 |
+
out = strict_sanitize_for_ui(cleaned_for_logic)
|
| 210 |
+
if not out:
|
| 211 |
+
out = "I couldn't generate a reliable answer. " + strict_sanitize_for_ui(supplement)
|
| 212 |
else:
|
| 213 |
+
out = out + "\n\n" + strict_sanitize_for_ui(supplement)
|
| 214 |
else:
|
| 215 |
+
out = strict_sanitize_for_ui(cleaned_for_logic)
|
| 216 |
|
| 217 |
+
# final guard: don't return empty
|
| 218 |
+
out = out or "Sorry — I couldn't produce a good answer. Could you rephrase or give more details?"
|
| 219 |
+
return out
|
| 220 |
|
| 221 |
+
# --- IMAGE / VQA PROCESSING (robust) ---
|
|
|
|
| 222 |
|
| 223 |
+
def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
| 224 |
+
"""
|
| 225 |
+
Uses image_pipe to produce VQA text. Returns prompt-injection (safe) + success flag.
|
| 226 |
+
This function is robust to multiple pipeline return shapes.
|
| 227 |
"""
|
| 228 |
global image_pipe
|
| 229 |
success = False
|
| 230 |
if image_pipe is None:
|
| 231 |
+
return f"[Image Processing Error: VLM model not loaded.] **User Query:** {message}", False
|
| 232 |
|
| 233 |
image = None
|
| 234 |
try:
|
|
|
|
| 237 |
elif isinstance(image_data_or_path, np.ndarray):
|
| 238 |
image = Image.fromarray(image_data_or_path).convert("RGB")
|
| 239 |
else:
|
| 240 |
+
# bytes or file-like
|
| 241 |
try:
|
| 242 |
image = Image.open(BytesIO(image_data_or_path)).convert("RGB")
|
| 243 |
except Exception:
|
| 244 |
image = None
|
| 245 |
|
| 246 |
if image is None:
|
| 247 |
+
return f"[Image Processing Error: Could not open image.] **User Query:** {message}", False
|
| 248 |
|
| 249 |
vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:"
|
| 250 |
+
# defensive calls: some pipelines accept prompt kwarg, some don't
|
| 251 |
results = None
|
| 252 |
try:
|
| 253 |
results = image_pipe(image, prompt=vqa_prompt)
|
| 254 |
except TypeError:
|
|
|
|
| 255 |
try:
|
| 256 |
results = image_pipe(image)
|
| 257 |
except Exception as e:
|
|
|
|
| 261 |
print(f"Image pipeline call error: {e}")
|
| 262 |
results = None
|
| 263 |
|
| 264 |
+
raw_text = ""
|
| 265 |
if results is None:
|
| 266 |
+
raw_text = ""
|
| 267 |
elif isinstance(results, dict):
|
| 268 |
+
raw_text = results.get("generated_text") or results.get("text") or ""
|
|
|
|
| 269 |
elif isinstance(results, list):
|
|
|
|
| 270 |
first = results[0]
|
| 271 |
if isinstance(first, dict):
|
| 272 |
+
raw_text = first.get("generated_text") or first.get("text") or ""
|
| 273 |
elif isinstance(first, str):
|
| 274 |
+
raw_text = first
|
| 275 |
elif isinstance(results, str):
|
| 276 |
+
raw_text = results
|
| 277 |
else:
|
|
|
|
| 278 |
try:
|
| 279 |
+
raw_text = str(results)
|
| 280 |
except Exception:
|
| 281 |
+
raw_text = ""
|
| 282 |
|
| 283 |
+
# pick assistant section if available
|
| 284 |
+
vqa_response = raw_text.split("ASSISTANT:")[-1].strip() if raw_text else ""
|
| 285 |
+
vqa_response = strict_sanitize_for_ui(vqa_response)
|
| 286 |
|
|
|
|
| 287 |
if not vqa_response or len(vqa_response) < 10:
|
| 288 |
vqa_response = (
|
| 289 |
+
"VQA analysis didn't return a clear answer. The image might be unclear or the question ambiguous. "
|
| 290 |
+
"Please re-upload a clearer image, crop to the subject, or give a short instruction about what you'd like answered."
|
| 291 |
)
|
| 292 |
success = False
|
| 293 |
else:
|
| 294 |
success = True
|
| 295 |
|
| 296 |
+
# Return safe, sanitized prompt injection for LLM
|
| 297 |
+
prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {strict_sanitize_for_ui(message)}"
|
|
|
|
| 298 |
return prompt_injection, success
|
| 299 |
|
| 300 |
except Exception as e:
|
| 301 |
+
print(f"Image processing exception: {e}")
|
| 302 |
+
return f"[Image Processing Error: {e}] **User Query:** {strict_sanitize_for_ui(message)}", False
|
| 303 |
+
|
| 304 |
+
# --- AUDIO / TTS ---
|
| 305 |
|
| 306 |
def transcribe_audio(audio_file_path: str) -> Tuple[str, str, gr.update, gr.update, bool, gr.update]:
|
| 307 |
+
if stt_pipe is None or not audio_file_path:
|
| 308 |
error_msg = "Error: Whisper model failed to load or no audio recorded."
|
| 309 |
return "", error_msg, gr.update(interactive=True), gr.update(value="↑", interactive=True, elem_classes=["circle-btn", "send-mode"]), False, gr.update(visible=False)
|
| 310 |
try:
|
|
|
|
| 337 |
return None
|
| 338 |
return None
|
| 339 |
|
| 340 |
+
# --- INTENT STATUS MAP (internal only) ---
|
| 341 |
INTENT_STATUS_MAP = {
|
| 342 |
"code_generate": "Analyzing requirements and drafting code 💻...",
|
| 343 |
"code_explain": "Reviewing code logic and writing explanation 💡...",
|
|
|
|
| 353 |
}
|
| 354 |
|
| 355 |
def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]:
|
| 356 |
+
"""
|
| 357 |
+
Internal parsing: returns (intent, status, cleaned_display_text).
|
| 358 |
+
cleaned_display_text is strictly sanitized for UI (no tags, no word-number dumps).
|
| 359 |
"""
|
| 360 |
intent_match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE)
|
| 361 |
intent = intent_match.group(1).lower() if intent_match else "default"
|
| 362 |
if is_vqa_flow:
|
| 363 |
intent = "vqa"
|
| 364 |
|
| 365 |
+
# produce sanitized display text
|
| 366 |
+
cleaned_text = strict_sanitize_for_ui(raw_response)
|
| 367 |
status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"])
|
| 368 |
return intent, status, cleaned_text
|
| 369 |
|
| 370 |
+
# --- FILE / IMAGE GENERATION ---
|
| 371 |
def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str):
|
|
|
|
| 372 |
file_path = None
|
| 373 |
try:
|
| 374 |
if not content or len(content.strip()) < MIN_MEANINGFUL_LENGTH:
|
|
|
|
| 416 |
file_path = None
|
| 417 |
return history, file_path
|
| 418 |
|
|
|
|
| 419 |
# --- CORE GENERATOR FUNCTION ---
|
| 420 |
def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any:
|
| 421 |
"""
|
| 422 |
+
- assistant entry is appended only when generation actually starts (no empty box).
|
| 423 |
+
- streaming is strictly sanitized at every step.
|
| 424 |
+
- intent/confidence/action NEVER reach UI (aggressively stripped).
|
| 425 |
"""
|
| 426 |
+
# Expect last entry to be user
|
| 427 |
if not history or history[-1]['role'] != 'user':
|
| 428 |
yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 429 |
return
|
|
|
|
| 431 |
last_user_index = len(history) - 1
|
| 432 |
original_message = history[last_user_index]['content'] or ""
|
| 433 |
|
| 434 |
+
# detect VQA flow
|
| 435 |
is_vqa_flow = False
|
| 436 |
if isinstance(image_input_data, str):
|
| 437 |
+
is_vqa_flow = bool(image_input_data)
|
| 438 |
elif isinstance(image_input_data, np.ndarray):
|
| 439 |
is_vqa_flow = image_input_data.size > 0
|
| 440 |
else:
|
|
|
|
| 444 |
llm_input_message = original_message
|
| 445 |
if is_vqa_flow:
|
| 446 |
processed_message, vqa_success = process_image(image_input_data, original_message)
|
| 447 |
+
# annotate user's recorded message but preserve original for logic
|
| 448 |
+
history[last_user_index]['content'] = f"[IMAGE RECEIVED] {strict_sanitize_for_ui(original_message)}"
|
| 449 |
llm_input_message = processed_message
|
| 450 |
|
| 451 |
+
# build prompt
|
| 452 |
prompt = f"SYSTEM: {SYSTEM_PROMPT}\n"
|
| 453 |
for item in history[:-1]:
|
| 454 |
role = item['role'].upper()
|
| 455 |
content = item['content'] or ""
|
| 456 |
if role == "ASSISTANT":
|
|
|
|
| 457 |
prompt += f"LUNA: {content}\n"
|
| 458 |
elif role == "USER":
|
| 459 |
prompt += f"USER: {content}\n"
|
| 460 |
prompt += f"USER: {llm_input_message}\nLUNA: "
|
| 461 |
|
| 462 |
+
# append assistant entry only now
|
| 463 |
assistant_initial_text = "✨ Luna is starting to think..."
|
| 464 |
history.append({"role": "assistant", "content": assistant_initial_text})
|
| 465 |
|
| 466 |
+
# initial UI hint
|
| 467 |
yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 468 |
+
time.sleep(0.12)
|
| 469 |
|
| 470 |
full_response = ""
|
| 471 |
current_intent = "default"
|
|
|
|
| 478 |
echo=False, stream=True, temperature=0.7
|
| 479 |
)
|
| 480 |
except Exception as e:
|
| 481 |
+
err = f"❌ Error generating response: {e}"
|
| 482 |
+
history[-1]['content'] = strict_sanitize_for_ui(err)
|
| 483 |
+
yield history, False, err, gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 484 |
return
|
| 485 |
|
| 486 |
+
# stream tokens; strictly sanitize and cap
|
| 487 |
try:
|
| 488 |
for output in stream:
|
| 489 |
iter_count += 1
|
| 490 |
if iter_count > STREAM_ITER_LIMIT:
|
| 491 |
+
full_response += "\n\n[Stream aborted: iteration limit reached]"
|
| 492 |
+
print("Stream aborted by iter limit.")
|
|
|
|
| 493 |
break
|
| 494 |
|
| 495 |
token = output["choices"][0].get("text", "")
|
|
|
|
| 497 |
token = str(token)
|
| 498 |
full_response += token
|
| 499 |
|
|
|
|
| 500 |
if len(full_response) > STREAM_CHAR_LIMIT:
|
| 501 |
+
full_response = full_response[:STREAM_CHAR_LIMIT] + "\n\n[Truncated: length limit reached]"
|
| 502 |
+
print("Stream truncated by char limit.")
|
| 503 |
break
|
| 504 |
|
| 505 |
+
# extract intent/status for internal UX hints but NEVER show tags
|
| 506 |
+
current_intent, current_hint, interim = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 507 |
+
# ALWAYS sanitize interim for UI
|
| 508 |
+
interim_ui = strict_sanitize_for_ui(interim)
|
| 509 |
+
if not interim_ui:
|
| 510 |
+
interim_ui = "✨ Luna is forming a reply..."
|
| 511 |
+
history[-1]['content'] = interim_ui
|
|
|
|
|
|
|
| 512 |
|
| 513 |
yield history, stop_signal, current_hint, gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 514 |
|
| 515 |
except Exception as e:
|
| 516 |
+
_, _, salvage = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 517 |
+
salvage_ui = strict_sanitize_for_ui(salvage) or f"⚠️ Streaming interrupted: {e}"
|
| 518 |
+
history[-1]['content'] = salvage_ui
|
|
|
|
| 519 |
yield history, False, f"⚠️ Streaming interrupted: {e}", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update()
|
| 520 |
return
|
| 521 |
|
| 522 |
+
# post-process
|
| 523 |
file_download_path = None
|
| 524 |
_, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 525 |
+
content_for_tool = strict_sanitize_for_ui(content_for_tool)
|
| 526 |
|
|
|
|
| 527 |
if current_intent == "image_generate":
|
| 528 |
if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
|
| 529 |
+
history[-1]['content'] = "I detected an image generation request but didn't get enough details. Please give a short description (e.g. 'red bicycle at sunrise, vivid colors')."
|
| 530 |
else:
|
| 531 |
history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
|
| 532 |
yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
|
|
|
| 542 |
|
| 543 |
elif current_intent == "ppt_generate":
|
| 544 |
if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
|
| 545 |
+
history[-1]['content'] = "I can make a short presentation — please give a title and 3–5 bullet points."
|
| 546 |
else:
|
| 547 |
history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
|
| 548 |
yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 549 |
history, file_download_path = generate_file_content(content_for_tool, history, "ppt")
|
| 550 |
|
| 551 |
elif current_intent == "open_google":
|
| 552 |
+
final_text = (content_for_tool or "").strip() + "\n\n🔗 **Action:** [Search Google](https://www.google.com/search?q=" + re.sub(r'\s+', '+', strict_sanitize_for_ui(original_message)) + ")"
|
| 553 |
+
history[-1]['content'] = strict_sanitize_for_ui(final_text)
|
| 554 |
|
| 555 |
elif current_intent == "open_camera":
|
| 556 |
+
final_text = (content_for_tool or "").strip() + "\n\n📸 **Action:** Use the 'Google Lens' button to capture an image."
|
| 557 |
+
history[-1]['content'] = strict_sanitize_for_ui(final_text)
|
| 558 |
|
| 559 |
else:
|
| 560 |
+
# normal path: evaluate confidence and maybe augment
|
| 561 |
final_response_content = check_confidence_and_augment(full_response, original_message)
|
| 562 |
history[-1]['content'] = final_response_content
|
| 563 |
|
| 564 |
+
# final defensive fallback
|
| 565 |
if not history[-1]['content'] or not str(history[-1]['content']).strip():
|
| 566 |
+
history[-1]['content'] = "Sorry — I couldn't produce a useful response. Could you rephrase or add details?"
|
| 567 |
|
|
|
|
| 568 |
audio_file_path = text_to_audio(history[-1]['content'], is_voice_chat)
|
| 569 |
|
| 570 |
hint = "✅ Response generated."
|
| 571 |
yield history, False, hint, gr.update(interactive=True), gr.update(value="↑", interactive=True), audio_file_path, False, gr.update(visible=True), gr.update(value=None), gr.update(), file_download_path
|
| 572 |
|
| 573 |
+
# --- GRADIO WRAPPERS ---
|
|
|
|
| 574 |
|
| 575 |
def toggle_menu(current_visibility: bool) -> Tuple[bool, gr.update, gr.update, gr.update]:
|
| 576 |
new_visibility = not current_visibility
|
| 577 |
return new_visibility, gr.update(visible=new_visibility), gr.update(visible=False), gr.update(value="⬇️" if new_visibility else "➕")
|
| 578 |
|
| 579 |
def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 580 |
has_text = bool(user_message and user_message.strip())
|
| 581 |
has_image = False
|
| 582 |
if isinstance(staged_image_input, str):
|
|
|
|
| 589 |
if not has_text and not has_image:
|
| 590 |
return user_message, chat_history
|
| 591 |
|
| 592 |
+
# prevent double send if assistant currently thinking
|
| 593 |
if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower():
|
| 594 |
return user_message, chat_history
|
| 595 |
|
| 596 |
user_message_to_add = "Analyzing Staged Media." if (not has_text and has_image) else user_message.strip()
|
| 597 |
+
chat_history.append({"role": "user", "content": strict_sanitize_for_ui(user_message_to_add)})
|
|
|
|
| 598 |
return "", chat_history
|
| 599 |
|
| 600 |
def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
|
|
|
|
| 617 |
return history, "Error: Could not find query.", gr.update(visible=False)
|
| 618 |
web_results = web_search_tool(last_user_prompt)
|
| 619 |
new_history = list(history)
|
| 620 |
+
new_history[-1]['content'] += "\n\n" + strict_sanitize_for_ui(web_results)
|
| 621 |
return new_history, "✅ Double-checked with web facts.", gr.update(visible=False)
|
| 622 |
|
| 623 |
def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]], Any, gr.update, gr.update, gr.update, gr.update, gr.update]:
|
|
|
|
| 626 |
chat_history[-1]['content'] = "📸 Preparing camera capture..."
|
| 627 |
return "", chat_history, staged_image_input, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value="📸 Capturing in 3 seconds...", interactive=False), gr.update(value="➕")
|
| 628 |
|
| 629 |
+
# --- GRADIO UI ---
|
|
|
|
| 630 |
with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
|
| 631 |
stop_signal = gr.State(value=False)
|
| 632 |
is_voice_chat = gr.State(value=False)
|
| 633 |
staged_image = gr.State(value=None)
|
| 634 |
menu_visible_state = gr.State(value=False)
|
| 635 |
|
| 636 |
+
gr.HTML("<h1 style='text-align: center; color: #4B0082;'>Prototype</h1>")
|
| 637 |
|
| 638 |
hint_box = gr.Textbox(value="Ask anything", lines=1, show_label=False, interactive=False, placeholder="Luna's Action...", visible=True)
|
| 639 |
file_download_output = gr.File(label="Generated File", visible=False)
|
|
|
|
| 665 |
|
| 666 |
output_components = [chatbot, stop_signal, hint_box, txt, combined_btn, audio_output, is_voice_chat, fact_check_btn_row, staged_image, file_input, file_download_output]
|
| 667 |
|
| 668 |
+
# wiring
|
| 669 |
+
btn_menu.click(fn=toggle_menu, inputs=[menu_visible_state], outputs=[menu_visible_state, menu_options_row, fact_check_btn_row, btn_menu], queue=False)
|
|
|
|
|
|
|
| 670 |
|
| 671 |
def prepare_file_upload(): return gr.update(visible=False), gr.update(value="➕"), gr.update(visible=False), gr.update(interactive=True), gr.update(value="")
|
| 672 |
btn_add_files.click(fn=prepare_file_upload, inputs=[], outputs=[menu_options_row, btn_menu, fact_check_btn_row, file_input, txt], queue=False)
|
| 673 |
|
| 674 |
+
file_input.change(fn=stage_file_upload, inputs=[file_input], outputs=[staged_image, hint_box, txt, file_input], queue=False)
|
|
|
|
|
|
|
| 675 |
|
| 676 |
btn_take_photo.click(
|
| 677 |
fn=lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "📸 Camera Active. Capture an image.", gr.update(value="➕")),
|
|
|
|
| 703 |
|
| 704 |
generator_inputs = [txt, staged_image, chatbot, stop_signal, is_voice_chat]
|
| 705 |
|
| 706 |
+
txt.submit(fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False).then(
|
|
|
|
|
|
|
| 707 |
fn=chat_generator, inputs=generator_inputs, outputs=output_components, queue=True
|
| 708 |
+
).then(fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False)
|
|
|
|
|
|
|
| 709 |
|
| 710 |
+
combined_btn.click(fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False).then(
|
|
|
|
|
|
|
| 711 |
fn=chat_generator, inputs=generator_inputs, outputs=output_components, queue=True
|
| 712 |
+
).then(fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False)
|
|
|
|
|
|
|
| 713 |
|
| 714 |
+
btn_fact_check.click(fn=manual_fact_check, inputs=[chatbot], outputs=[chatbot, hint_box, fact_check_btn_row], queue=True)
|
|
|
|
|
|
|
| 715 |
|
| 716 |
demo.queue(max_size=20).launch(server_name="0.0.0.0")
|