cosmosai471 commited on
Commit
bb016b3
·
verified ·
1 Parent(s): 6ac02a9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +204 -129
app.py CHANGED
@@ -14,7 +14,7 @@ from diffusers import StableDiffusionPipeline
14
  from docx import Document
15
  from pptx import Presentation
16
  from io import BytesIO
17
- import numpy as np
18
 
19
  # --- CONFIGURATION & INITIALIZATION ---
20
  STT_DEVICE = "cpu"
@@ -30,12 +30,15 @@ MODEL_FILE = "luna.gguf"
30
  LOCAL_MODEL_PATH = MODEL_FILE
31
  SYSTEM_PROMPT = (
32
  "You are Luna, a helpful and friendly AI assistant. Your response must begin with two separate "
33
- "tags: an **Intent** tag and a **Confidence** tag (0-100). Example: '[Intent: qa_general][Confidence: 85]'. "
34
  "Your full response must follow these tags."
35
  )
36
 
37
- # Configuration: confidence threshold for triggering web search fallback
38
- CONFIDENCE_THRESHOLD = 30 # only trigger web-search fallback if confidence is less than this
 
 
 
39
 
40
  def safe_del(self):
41
  try:
@@ -100,34 +103,46 @@ def simulate_recording_delay():
100
  time.sleep(3)
101
  return None
102
 
103
- def clean_response_stream(raw_text: str) -> str:
104
- """Cleans up raw response text by removing tags and repeats.
105
- We intentionally DO NOT split on plain words 'Intent' or 'Action' to avoid chopping tags.
106
  """
107
- # Cut at common separators marking model streaming boundaries
108
- clean_text = re.split(r'\nUser:|\nAssistant:|</s>', raw_text, 1)[0].strip()
109
- # Remove bracketed instruction tokens and inline actions
110
- clean_text = re.sub(r'\[/?INST\]|\[/?s\]|\s*<action>.*?</action>\s*', '', clean_text, flags=re.DOTALL).strip()
111
- # Remove tags for display ([Intent: ...], [Confidence: ...]) — keep them for parsing elsewhere
112
- clean_text = re.sub(r'\[Intent:\s*[\w\-\_]+\]|\[Confidence:\s*\d{1,3}\]', '', clean_text, flags=re.IGNORECASE).strip()
113
- # Deduplicate trailing repeated words (simple heuristic)
114
- words = clean_text.split()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  if len(words) > 4 and words[-2:] == words[-4:-2]:
116
- clean_text = ' '.join(words[:-2])
117
- return clean_text
118
 
119
  def web_search_tool(query: str) -> str:
120
- time.sleep(1.5)
121
  print(f"Simulating Google Search fallback for: {query}")
122
- return f"\n\n🌐 **Web Search Results for '{query}':** I've gathered information from external sources to supplement my knowledge."
123
 
124
  def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str:
125
- """Checks confidence from the raw response tag and triggers fallback if very low.
126
-
127
- - If explicit [Confidence: N] exists, use it.
128
- - Otherwise fall back to heuristic based on cleaned response length.
129
- - Only triggers web search if below CONFIDENCE_THRESHOLD.
130
- """
131
  confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE)
132
  cleaned_response = clean_response_stream(raw_response_with_tags)
133
 
@@ -138,33 +153,35 @@ def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> st
138
  except Exception:
139
  confidence_score = 0
140
  else:
141
- # heuristic: very short or empty cleaned response -> low confidence
142
- if not cleaned_response or len(cleaned_response.strip()) < 30:
143
- confidence_score = 10
144
- else:
145
- confidence_score = 85
146
 
147
  if confidence_score < CONFIDENCE_THRESHOLD:
148
- print(f"Low confidence ({confidence_score}%) detected (threshold={CONFIDENCE_THRESHOLD}). Triggering web-search fallback.")
 
149
  search_snippet = web_search_tool(prompt)
150
  if "error" in cleaned_response.lower() or confidence_score <= 5:
151
- final_response = f"I apologize for the limited response (Confidence: {confidence_score}%). {search_snippet} I will use this to generate a more comprehensive answer."
152
  else:
153
- final_response = f"{cleaned_response} {search_snippet} I can elaborate further based on this."
154
  else:
155
  final_response = cleaned_response
156
 
 
 
157
  return final_response
158
 
159
  def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
160
- """Perform VQA via the image_pipe. Returns a prompt-injection string for the LLM and success flag.
161
 
162
- If the VLM fails or returns nothing meaningful, return helpful instructions to the LLM rather than empty.
 
 
163
  """
164
  global image_pipe
165
  success = False
166
  if image_pipe is None:
167
- return f"[Image Processing Error: VLM model is not loaded.] **User Query:** {message}", success
168
 
169
  image = None
170
  try:
@@ -172,33 +189,76 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
172
  image = Image.open(image_data_or_path).convert("RGB")
173
  elif isinstance(image_data_or_path, np.ndarray):
174
  image = Image.fromarray(image_data_or_path).convert("RGB")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
 
176
- if image:
177
- vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:"
178
- results = image_pipe(image, prompt=vqa_prompt, generate_kwargs={"max_new_tokens": 1024})
179
- raw_vlm_output = results[0].get('generated_text', "") if results and isinstance(results, list) else ""
180
- vqa_response = raw_vlm_output.split("ASSISTANT:")[-1].strip() if raw_vlm_output else ""
181
-
182
- # If empty or nonsense, produce a friendly fallback message
183
- if not vqa_response:
184
- vqa_response = (
185
- "VQA analysis returned no clear answer. Possible reasons: image unreadable, wrong crop, or "
186
- "ambiguous content. Please re-upload a clearer image or provide more context about what you want."
187
- )
188
- success = False
189
- else:
190
- success = True
191
-
192
- del image
193
- prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {message}"
194
- return prompt_injection, success
195
 
196
  except Exception as e:
197
- print(f"Image Pipeline Error: {e}")
198
  return f"[Image Processing Error: {e}] **User Query:** {message}", success
199
 
200
- return f"[Image Processing Error: Could not load image data.] **User Query:** {message}", success
201
-
202
  def transcribe_audio(audio_file_path: str) -> Tuple[str, str, gr.update, gr.update, bool, gr.update]:
203
  if stt_pipe is None or audio_file_path is None:
204
  error_msg = "Error: Whisper model failed to load or no audio recorded."
@@ -248,29 +308,27 @@ INTENT_STATUS_MAP = {
248
  }
249
 
250
  def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]:
251
- """Parses intent (and removes tags for display). Returns (intent, status, cleaned_text_for_display)."""
252
- match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE)
253
- intent = match.group(1).lower() if match else "default"
 
 
254
  if is_vqa_flow:
255
  intent = "vqa"
256
- # Remove only the display tags, keep raw_response intact elsewhere
257
- cleaned_text = re.sub(r'\[Intent:\s*[\w\-\_]+\]\s*', '', raw_response, count=1, flags=re.IGNORECASE).strip()
258
- cleaned_text = re.sub(r'\[Confidence:\s*\d{1,3}\]\s*', '', cleaned_text, count=1, flags=re.IGNORECASE).strip()
259
- cleaned_text = clean_response_stream(cleaned_text) # extra clean
260
  status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"])
261
  return intent, status, cleaned_text
262
 
263
  def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str):
264
- """Generates a file (Image, DOCX, PPTX) and returns the file path for download.
265
-
266
- If content is too short or missing, ask the user to clarify instead of producing empty files.
267
- """
268
  file_path = None
269
  try:
270
- if not content or len(content.strip()) < 20:
271
  history[-1]['content'] = (
272
- f"⚠️ I was instructed to generate a {file_type}, but I don't have enough details. "
273
- "Could you please provide a short description or title for the file (what should it contain)?"
274
  )
275
  return history, None
276
 
@@ -297,7 +355,6 @@ def generate_file_content(content: str, history: List[Dict[str, str]], file_type
297
  try:
298
  slide.placeholders[1].text = content[:200] + "..."
299
  except Exception:
300
- # fallback if layout mismatch
301
  pass
302
  file_filename = f"generated_ppt_{random.randint(1000, 9999)}.pptx"
303
  file_path = os.path.join(DOC_DIR, file_filename)
@@ -313,22 +370,21 @@ def generate_file_content(content: str, history: List[Dict[str, str]], file_type
313
  file_path = None
314
  return history, file_path
315
 
 
316
  # --- CORE GENERATOR FUNCTION ---
317
  def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any:
318
  """
319
- Returns: [chatbot, stop_signal, hint_box, txt, combined_btn, audio_output, is_voice_chat, fact_check_btn_row, staged_image, file_input, file_download_output]
320
- Changes made:
321
- - user_turn will now only append the user message. We add the assistant entry here once generation starts,
322
- so there's no empty assistant box created prematurely.
323
  """
324
-
325
- # Validate that last item is a USER (we expect user_turn to add only the user record)
326
  if not history or history[-1]['role'] != 'user':
327
  yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
328
  return
329
 
330
  last_user_index = len(history) - 1
331
- original_message = history[last_user_index]['content'] if history[last_user_index]['content'] is not None else ""
332
 
333
  # Detect VQA flow
334
  is_vqa_flow = False
@@ -339,36 +395,37 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
339
  else:
340
  is_vqa_flow = image_input_data is not None
341
 
342
- # Process image if present (returns prompt injection for LLM)
343
  vqa_success = False
344
  llm_input_message = original_message
345
  if is_vqa_flow:
346
  processed_message, vqa_success = process_image(image_input_data, original_message)
347
- # Replace the user's content with tag for logging while preserving original_message separately
348
  history[last_user_index]['content'] = f"[IMAGE RECEIVED] {original_message}"
349
  llm_input_message = processed_message
350
 
351
- # Build prompt (system + conversation)
352
  prompt = f"SYSTEM: {SYSTEM_PROMPT}\n"
353
- for item in history[:-1]: # all conversation before last user
354
  role = item['role'].upper()
355
- content = item['content'] if item['content'] is not None else ""
356
  if role == "ASSISTANT":
 
357
  prompt += f"LUNA: {content}\n"
358
  elif role == "USER":
359
  prompt += f"USER: {content}\n"
360
  prompt += f"USER: {llm_input_message}\nLUNA: "
361
 
362
- # Now create assistant entry only when we begin generation (avoids empty assistant box)
363
  assistant_initial_text = "✨ Luna is starting to think..."
364
  history.append({"role": "assistant", "content": assistant_initial_text})
365
 
366
- # Early UI update to show the thinking state (assistant box will appear now)
367
  yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
368
- time.sleep(0.2)
369
 
370
  full_response = ""
371
  current_intent = "default"
 
372
 
373
  try:
374
  stream = llm.create_completion(
@@ -378,70 +435,99 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
378
  )
379
  except Exception as e:
380
  error_text = f"❌ Error generating response: {e}"
381
- # update assistant with error
382
  history[-1]['content'] = error_text
383
  yield history, False, error_text, gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
384
  return
385
 
386
- # Stream tokens and update assistant content incrementally (without exposing tags)
387
  try:
388
  for output in stream:
 
 
 
 
 
 
 
389
  token = output["choices"][0].get("text", "")
 
 
390
  full_response += token
391
- current_intent, current_hint, display_text = get_intent_status(full_response, is_vqa_flow and vqa_success)
392
- # display_text is cleaned (no [Intent] or [Confidence])
393
- # Ensure we never set assistant content to empty — if cleaned is empty, show a small typing indicator
394
- history[-1]['content'] = display_text if display_text.strip() else " Luna is forming a reply..."
 
 
 
 
 
 
 
 
 
 
 
 
 
395
  yield history, stop_signal, current_hint, gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
 
396
  except Exception as e:
397
- # Stream interruption salvage what we have
398
- _, _, final_response_text = get_intent_status(full_response, is_vqa_flow and vqa_success)
399
- error_msg = f"⚠️ Streaming interrupted: {e}"
400
- history[-1]['content'] = final_response_text if final_response_text.strip() else error_msg
401
- yield history, False, error_msg, gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update()
402
  return
403
 
404
  # POST-PROCESSING & TOOL EXECUTION
405
  file_download_path = None
406
  _, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success)
 
407
 
408
- # If model wants to run a tool but content is weak, ask for clarification instead of generating empty files
409
  if current_intent == "image_generate":
410
- if not content_for_tool or len(content_for_tool.strip()) < 20:
411
- history[-1]['content'] = "I detected a request to generate an image but I don't have enough prompt details. Please give a short description: e.g. 'sunset over mountains, vibrant colors'."
412
  else:
413
  history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
414
  yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
415
  history, file_download_path = generate_file_content(content_for_tool, history, "image")
 
416
  elif current_intent == "doc_generate":
417
- if not content_for_tool or len(content_for_tool.strip()) < 20:
418
- history[-1]['content'] = "I was asked to generate a document but I need more details what's the document about? (1–2 sentences.)"
419
  else:
420
  history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
421
  yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
422
  history, file_download_path = generate_file_content(content_for_tool, history, "doc")
 
423
  elif current_intent == "ppt_generate":
424
- if not content_for_tool or len(content_for_tool.strip()) < 20:
425
- history[-1]['content'] = "I can make a short presentation, but please give me a title and 3���5 bullet points to include."
426
  else:
427
  history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
428
  yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
429
  history, file_download_path = generate_file_content(content_for_tool, history, "ppt")
 
430
  elif current_intent == "open_google":
431
- final_cleaned_response = content_for_tool + "\n\n🔗 **Action:** [Search Google](https://www.google.com/search?q=open+google+simulated+search)"
432
- history[-1]['content'] = final_cleaned_response
 
433
  elif current_intent == "open_camera":
434
- final_cleaned_response = content_for_tool + "\n\n📸 **Action:** Use the 'Google Lens' button to capture an image."
435
- history[-1]['content'] = final_cleaned_response
 
436
  else:
437
- # Normal response path check confidence and maybe augment with web-search snippet
438
  final_response_content = check_confidence_and_augment(full_response, original_message)
439
  history[-1]['content'] = final_response_content
440
 
441
- # If after all processing the assistant content is empty (defensive), fill a friendly fallback
442
  if not history[-1]['content'] or not str(history[-1]['content']).strip():
443
  history[-1]['content'] = "Sorry — I couldn't produce a good response. Can you rephrase or give more details?"
444
 
 
445
  audio_file_path = text_to_audio(history[-1]['content'], is_voice_chat)
446
 
447
  hint = "✅ Response generated."
@@ -456,9 +542,8 @@ def toggle_menu(current_visibility: bool) -> Tuple[bool, gr.update, gr.update, g
456
 
457
  def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]:
458
  """
459
- Appends only the USER message to chat_history. We no longer append an assistant placeholder here,
460
- so the UI won't show an empty assistant box immediately after user sends a message.
461
- The assistant will be appended inside chat_generator when generation begins.
462
  """
463
  has_text = bool(user_message and user_message.strip())
464
  has_image = False
@@ -472,17 +557,13 @@ def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_imag
472
  if not has_text and not has_image:
473
  return user_message, chat_history
474
 
475
- # Prevent double-sending if assistant is already generating (detect last assistant placeholder)
476
  if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower():
477
  return user_message, chat_history
478
 
479
- if not has_text and has_image:
480
- user_message_to_add = "Analyzing Staged Media."
481
- else:
482
- user_message_to_add = user_message.strip()
483
-
484
  chat_history.append({"role": "user", "content": user_message_to_add})
485
- # do NOT append assistant here — chat_generator will append assistant entry when it starts
486
  return "", chat_history
487
 
488
  def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
@@ -491,7 +572,6 @@ def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
491
  return None, "File upload cancelled.", gr.update(value="", interactive=True), gr.update(interactive=False)
492
 
493
  def clear_staged_media() -> gr.update:
494
- """Clears the staged media state component."""
495
  return gr.update(value=None)
496
 
497
  def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str, gr.update]:
@@ -517,10 +597,7 @@ def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], s
517
 
518
 
519
  # --- GRADIO INTERFACE ---
520
-
521
  with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
522
-
523
- # --- State Components ---
524
  stop_signal = gr.State(value=False)
525
  is_voice_chat = gr.State(value=False)
526
  staged_image = gr.State(value=None)
@@ -558,7 +635,7 @@ with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
558
 
559
  output_components = [chatbot, stop_signal, hint_box, txt, combined_btn, audio_output, is_voice_chat, fact_check_btn_row, staged_image, file_input, file_download_output]
560
 
561
- # --- WIRE EVENTS ---
562
  btn_menu.click(
563
  fn=toggle_menu, inputs=[menu_visible_state], outputs=[menu_visible_state, menu_options_row, fact_check_btn_row, btn_menu], queue=False
564
  )
@@ -600,7 +677,6 @@ with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
600
 
601
  generator_inputs = [txt, staged_image, chatbot, stop_signal, is_voice_chat]
602
 
603
- # Text submit (Enter key)
604
  txt.submit(
605
  fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False
606
  ).then(
@@ -609,7 +685,6 @@ with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
609
  fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False
610
  )
611
 
612
- # Send button click
613
  combined_btn.click(
614
  fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False
615
  ).then(
 
14
  from docx import Document
15
  from pptx import Presentation
16
  from io import BytesIO
17
+ import numpy as np
18
 
19
  # --- CONFIGURATION & INITIALIZATION ---
20
  STT_DEVICE = "cpu"
 
30
  LOCAL_MODEL_PATH = MODEL_FILE
31
  SYSTEM_PROMPT = (
32
  "You are Luna, a helpful and friendly AI assistant. Your response must begin with two separate "
33
+ "tags: an Intent tag and a Confidence tag (0-100). Example: '[Intent: qa_general][Confidence: 85]'. "
34
  "Your full response must follow these tags."
35
  )
36
 
37
+ # --- TUNABLES / GUARDS ---
38
+ CONFIDENCE_THRESHOLD = 30 # only trigger web-search fallback if confidence is less than this
39
+ STREAM_CHAR_LIMIT = 35000 # hard cap on streaming response size (prevents runaway)
40
+ STREAM_ITER_LIMIT = 20000 # hard cap on streaming token iterations
41
+ MIN_MEANINGFUL_LENGTH = 20 # used when determining if a tool prompt is sufficient
42
 
43
  def safe_del(self):
44
  try:
 
103
  time.sleep(3)
104
  return None
105
 
106
+ def remove_all_tags(text: str) -> str:
107
+ """Remove ALL bracketed Intent/Confidence tags and any plain 'Intent:' or 'Confidence:' lines.
108
+ This ensures tags never leak into the UI.
109
  """
110
+ if not text:
111
+ return ""
112
+ # remove bracketed tags like [Intent: xyz] [Confidence: 85]
113
+ text = re.sub(r'\[Intent:\s*[\w\-\_]+\]', '', text, flags=re.IGNORECASE)
114
+ text = re.sub(r'\[Confidence:\s*\d{1,3}\]', '', text, flags=re.IGNORECASE)
115
+ # remove any lines that start with "Intent:" or "Confidence:" (plain text)
116
+ text = re.sub(r'(?im)^\s*Intent:\s*.*$', '', text)
117
+ text = re.sub(r'(?im)^\s*Confidence:\s*.*$', '', text)
118
+ # collapse multiple blank lines
119
+ text = re.sub(r'\n\s*\n+', '\n\n', text).strip()
120
+ return text
121
+
122
+ def clean_response_stream(raw_text: str) -> str:
123
+ """Cleans up raw response text and removes any tag artifacts (safe for UI)."""
124
+ if not raw_text:
125
+ return ""
126
+ # Trim at common model separators to avoid carrying lots of leftover prompt text
127
+ truncated = re.split(r'\nUser:|\nAssistant:|</s>', raw_text, 1)[0].strip()
128
+ # Remove instruction tokens
129
+ truncated = re.sub(r'\[/?INST\]|\[/?s\]|\s*<action>.*?</action>\s*', '', truncated, flags=re.DOTALL).strip()
130
+ # Remove any explicit tags and any plain lines referencing Intent/Confidence
131
+ truncated = remove_all_tags(truncated)
132
+ # Deduplicate trivial repeated endings
133
+ words = truncated.split()
134
  if len(words) > 4 and words[-2:] == words[-4:-2]:
135
+ truncated = ' '.join(words[:-2])
136
+ return truncated.strip()
137
 
138
  def web_search_tool(query: str) -> str:
139
+ time.sleep(1.2)
140
  print(f"Simulating Google Search fallback for: {query}")
141
+ return f"\n\n🌐 **Web Search Results for '{query}':** I found supplemental info to help answer this."
142
 
143
  def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str:
144
+ """Checks confidence and optionally augments via web search. This function never returns raw tags."""
145
+ # Find only the first confidence occurrence (if any)
 
 
 
 
146
  confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE)
147
  cleaned_response = clean_response_stream(raw_response_with_tags)
148
 
 
153
  except Exception:
154
  confidence_score = 0
155
  else:
156
+ # heuristic: if the cleaned response is short or empty, regard as low confidence
157
+ confidence_score = 10 if not cleaned_response or len(cleaned_response) < 30 else 85
 
 
 
158
 
159
  if confidence_score < CONFIDENCE_THRESHOLD:
160
+ # very low confidence -> append search snippet
161
+ print(f"Low confidence ({confidence_score}%) detected (threshold={CONFIDENCE_THRESHOLD}). Using web fallback.")
162
  search_snippet = web_search_tool(prompt)
163
  if "error" in cleaned_response.lower() or confidence_score <= 5:
164
+ final_response = f"I apologize I couldn't produce a reliable answer (Confidence: {confidence_score}%). {search_snippet}"
165
  else:
166
+ final_response = f"{cleaned_response}\n\n{search_snippet}\n\nIf you'd like, I can attempt a deeper search or try again."
167
  else:
168
  final_response = cleaned_response
169
 
170
+ # Ensure final_response contains no tags
171
+ final_response = remove_all_tags(final_response)
172
  return final_response
173
 
174
  def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
175
+ """Perform VQA via the image_pipe. Robust to different pipeline return types.
176
 
177
+ Returns:
178
+ - prompt_injection: text to include in LLM prompt describing the VQA output or error guidance
179
+ - success: whether the VLM produced a clear answer
180
  """
181
  global image_pipe
182
  success = False
183
  if image_pipe is None:
184
+ return f"[Image Processing Error: VLM model not loaded.] **User Query:** {message}", success
185
 
186
  image = None
187
  try:
 
189
  image = Image.open(image_data_or_path).convert("RGB")
190
  elif isinstance(image_data_or_path, np.ndarray):
191
  image = Image.fromarray(image_data_or_path).convert("RGB")
192
+ else:
193
+ # Unknown image container (e.g., bytes)
194
+ try:
195
+ image = Image.open(BytesIO(image_data_or_path)).convert("RGB")
196
+ except Exception:
197
+ image = None
198
+
199
+ if image is None:
200
+ return f"[Image Processing Error: Could not open image.] **User Query:** {message}", success
201
+
202
+ vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:"
203
+ # Some pipelines accept (image, prompt=...), some accept kwargs. Try both patterns and be defensive.
204
+ results = None
205
+ try:
206
+ results = image_pipe(image, prompt=vqa_prompt)
207
+ except TypeError:
208
+ # fallback signature
209
+ try:
210
+ results = image_pipe(image)
211
+ except Exception as e:
212
+ print(f"Image pipeline call failed: {e}")
213
+ results = None
214
+ except Exception as e:
215
+ print(f"Image pipeline call error: {e}")
216
+ results = None
217
+
218
+ raw_vlm_output = ""
219
+ if results is None:
220
+ raw_vlm_output = ""
221
+ elif isinstance(results, dict):
222
+ # some pipelines return dict with 'generated_text' or 'text'
223
+ raw_vlm_output = results.get('generated_text') or results.get('text') or ""
224
+ elif isinstance(results, list):
225
+ # list of dicts or strings
226
+ first = results[0]
227
+ if isinstance(first, dict):
228
+ raw_vlm_output = first.get('generated_text') or first.get('text') or ""
229
+ elif isinstance(first, str):
230
+ raw_vlm_output = first
231
+ elif isinstance(results, str):
232
+ raw_vlm_output = results
233
+ else:
234
+ # unknown shape -> convert to string safe
235
+ try:
236
+ raw_vlm_output = str(results)
237
+ except Exception:
238
+ raw_vlm_output = ""
239
 
240
+ # extract assistant part
241
+ vqa_response = raw_vlm_output.split("ASSISTANT:")[-1].strip() if raw_vlm_output else ""
242
+
243
+ # If no meaningful vqa_response, return a helpful fallback message
244
+ if not vqa_response or len(vqa_response) < 10:
245
+ vqa_response = (
246
+ "VQA analysis did not return a clear answer. The image might be unclear or the request ambiguous. "
247
+ "Please try re-uploading a clearer image, crop to the subject, or add a short instruction about what you'd like answered."
248
+ )
249
+ success = False
250
+ else:
251
+ success = True
252
+
253
+ # Always remove any tags to prevent leaking
254
+ vqa_response = remove_all_tags(vqa_response)
255
+ prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {message}"
256
+ return prompt_injection, success
 
 
257
 
258
  except Exception as e:
259
+ print(f"Image Pipeline Exception: {e}")
260
  return f"[Image Processing Error: {e}] **User Query:** {message}", success
261
 
 
 
262
  def transcribe_audio(audio_file_path: str) -> Tuple[str, str, gr.update, gr.update, bool, gr.update]:
263
  if stt_pipe is None or audio_file_path is None:
264
  error_msg = "Error: Whisper model failed to load or no audio recorded."
 
308
  }
309
 
310
  def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]:
311
+ """Parses intent (first occurrence only) and returns (intent, status, cleaned_display_text).
312
+ Importantly: this DOES NOT expose any tags — we remove them for display.
313
+ """
314
+ intent_match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE)
315
+ intent = intent_match.group(1).lower() if intent_match else "default"
316
  if is_vqa_flow:
317
  intent = "vqa"
318
+
319
+ # Clean raw_response for display: remove all tags and noisy prompt leftovers
320
+ cleaned_text = clean_response_stream(raw_response)
 
321
  status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"])
322
  return intent, status, cleaned_text
323
 
324
  def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str):
325
+ """Generates a file and writes it to disk. If content insufficient, asks for clarification."""
 
 
 
326
  file_path = None
327
  try:
328
+ if not content or len(content.strip()) < MIN_MEANINGFUL_LENGTH:
329
  history[-1]['content'] = (
330
+ f"⚠️ I was asked to create a {file_type}, but I don't have enough details. "
331
+ "Please provide a 1–2 sentence description of what the file should contain."
332
  )
333
  return history, None
334
 
 
355
  try:
356
  slide.placeholders[1].text = content[:200] + "..."
357
  except Exception:
 
358
  pass
359
  file_filename = f"generated_ppt_{random.randint(1000, 9999)}.pptx"
360
  file_path = os.path.join(DOC_DIR, file_filename)
 
370
  file_path = None
371
  return history, file_path
372
 
373
+
374
  # --- CORE GENERATOR FUNCTION ---
375
  def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any:
376
  """
377
+ - Assistant entry is appended ONLY when generation begins (avoids empty assistant box).
378
+ - Strict caps on streaming to avoid infinite printing.
379
+ - Strict removal of tags before any content is written to history for UI.
 
380
  """
381
+ # Validate last item is user's message
 
382
  if not history or history[-1]['role'] != 'user':
383
  yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
384
  return
385
 
386
  last_user_index = len(history) - 1
387
+ original_message = history[last_user_index]['content'] or ""
388
 
389
  # Detect VQA flow
390
  is_vqa_flow = False
 
395
  else:
396
  is_vqa_flow = image_input_data is not None
397
 
 
398
  vqa_success = False
399
  llm_input_message = original_message
400
  if is_vqa_flow:
401
  processed_message, vqa_success = process_image(image_input_data, original_message)
402
+ # mark user's entry (but keep original message around)
403
  history[last_user_index]['content'] = f"[IMAGE RECEIVED] {original_message}"
404
  llm_input_message = processed_message
405
 
406
+ # Build prompt
407
  prompt = f"SYSTEM: {SYSTEM_PROMPT}\n"
408
+ for item in history[:-1]:
409
  role = item['role'].upper()
410
+ content = item['content'] or ""
411
  if role == "ASSISTANT":
412
+ # ensure assistant content used in prompt still includes tags if model expects them (we don't alter)
413
  prompt += f"LUNA: {content}\n"
414
  elif role == "USER":
415
  prompt += f"USER: {content}\n"
416
  prompt += f"USER: {llm_input_message}\nLUNA: "
417
 
418
+ # Add assistant entry now (so it appears only when generation starts)
419
  assistant_initial_text = "✨ Luna is starting to think..."
420
  history.append({"role": "assistant", "content": assistant_initial_text})
421
 
422
+ # Early UI update: show thinking state
423
  yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
424
+ time.sleep(0.15)
425
 
426
  full_response = ""
427
  current_intent = "default"
428
+ iter_count = 0
429
 
430
  try:
431
  stream = llm.create_completion(
 
435
  )
436
  except Exception as e:
437
  error_text = f"❌ Error generating response: {e}"
 
438
  history[-1]['content'] = error_text
439
  yield history, False, error_text, gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
440
  return
441
 
442
+ # Stream tokens with strict caps and tag-removal for UI
443
  try:
444
  for output in stream:
445
+ iter_count += 1
446
+ if iter_count > STREAM_ITER_LIMIT:
447
+ # safety abort
448
+ full_response += "\n\n[Stream stopped: reached iteration limit]"
449
+ print("Stream aborted: iteration limit reached.")
450
+ break
451
+
452
  token = output["choices"][0].get("text", "")
453
+ if not isinstance(token, str):
454
+ token = str(token)
455
  full_response += token
456
+
457
+ # safety cap on characters to prevent runaway printing
458
+ if len(full_response) > STREAM_CHAR_LIMIT:
459
+ full_response = full_response[:STREAM_CHAR_LIMIT] + "\n\n[Truncated: stream length limit reached]"
460
+ print("Stream truncated: char limit reached.")
461
+ break
462
+
463
+ # parse intent/status and cleaned display without exposing tags
464
+ current_intent, current_hint, cleaned_display = get_intent_status(full_response, is_vqa_flow and vqa_success)
465
+
466
+ # enforce tag-suppression: cleaned_display MUST NOT contain tag patterns
467
+ cleaned_display = remove_all_tags(cleaned_display)
468
+ # guarantee non-empty display while streaming
469
+ if not cleaned_display.strip():
470
+ cleaned_display = "✨ Luna is forming a reply..."
471
+ history[-1]['content'] = cleaned_display
472
+
473
  yield history, stop_signal, current_hint, gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
474
+
475
  except Exception as e:
476
+ # On exception, salvage partial output
477
+ _, _, final_clean = get_intent_status(full_response, is_vqa_flow and vqa_success)
478
+ final_clean = remove_all_tags(final_clean) or f"⚠️ Streaming interrupted: {e}"
479
+ history[-1]['content'] = final_clean
480
+ yield history, False, f"⚠️ Streaming interrupted: {e}", gr.update(interactive=True), gr.update(value="↑", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update()
481
  return
482
 
483
  # POST-PROCESSING & TOOL EXECUTION
484
  file_download_path = None
485
  _, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success)
486
+ content_for_tool = remove_all_tags(content_for_tool) # ensure no tags in tool prompts
487
 
488
+ # Handle tool intents, but require sufficient content; otherwise ask for clarification
489
  if current_intent == "image_generate":
490
+ if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
491
+ history[-1]['content'] = "I detected an image generation intent but didn't get enough details. Please give a short prompt (e.g. 'a red bicycle in a park at sunrise')."
492
  else:
493
  history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
494
  yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
495
  history, file_download_path = generate_file_content(content_for_tool, history, "image")
496
+
497
  elif current_intent == "doc_generate":
498
+ if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
499
+ history[-1]['content'] = "I can create a document, but I need a 1–2 sentence description of what to include."
500
  else:
501
  history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
502
  yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
503
  history, file_download_path = generate_file_content(content_for_tool, history, "doc")
504
+
505
  elif current_intent == "ppt_generate":
506
+ if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
507
+ history[-1]['content'] = "I can make a presentation please give a title and 35 bullet points."
508
  else:
509
  history[-1]['content'] = INTENT_STATUS_MAP[current_intent]
510
  yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop ⏹️", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
511
  history, file_download_path = generate_file_content(content_for_tool, history, "ppt")
512
+
513
  elif current_intent == "open_google":
514
+ final_cleaned_response = (content_for_tool or "").strip() + "\n\n🔗 **Action:** [Search Google](https://www.google.com/search?q=open+google+simulated+search)"
515
+ history[-1]['content'] = remove_all_tags(final_cleaned_response)
516
+
517
  elif current_intent == "open_camera":
518
+ final_cleaned_response = (content_for_tool or "").strip() + "\n\n📸 **Action:** Use the 'Google Lens' button to capture an image."
519
+ history[-1]['content'] = remove_all_tags(final_cleaned_response)
520
+
521
  else:
522
+ # default path: evaluate confidence and optionally augment with web search
523
  final_response_content = check_confidence_and_augment(full_response, original_message)
524
  history[-1]['content'] = final_response_content
525
 
526
+ # Final defensive fallback
527
  if not history[-1]['content'] or not str(history[-1]['content']).strip():
528
  history[-1]['content'] = "Sorry — I couldn't produce a good response. Can you rephrase or give more details?"
529
 
530
+ # convert to audio if requested
531
  audio_file_path = text_to_audio(history[-1]['content'], is_voice_chat)
532
 
533
  hint = "✅ Response generated."
 
542
 
543
  def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]:
544
  """
545
+ Appends only the USER message to chat_history. Assistant entry is appended inside chat_generator
546
+ once generation starts (avoids empty assistant box).
 
547
  """
548
  has_text = bool(user_message and user_message.strip())
549
  has_image = False
 
557
  if not has_text and not has_image:
558
  return user_message, chat_history
559
 
560
+ # Prevent double-send if assistant already generating
561
  if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower():
562
  return user_message, chat_history
563
 
564
+ user_message_to_add = "Analyzing Staged Media." if (not has_text and has_image) else user_message.strip()
 
 
 
 
565
  chat_history.append({"role": "user", "content": user_message_to_add})
566
+ # DO NOT append assistant here
567
  return "", chat_history
568
 
569
  def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
 
572
  return None, "File upload cancelled.", gr.update(value="", interactive=True), gr.update(interactive=False)
573
 
574
  def clear_staged_media() -> gr.update:
 
575
  return gr.update(value=None)
576
 
577
  def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str, gr.update]:
 
597
 
598
 
599
  # --- GRADIO INTERFACE ---
 
600
  with gr.Blocks(theme=gr.themes.Soft(), title="Luna Coding Partner") as demo:
 
 
601
  stop_signal = gr.State(value=False)
602
  is_voice_chat = gr.State(value=False)
603
  staged_image = gr.State(value=None)
 
635
 
636
  output_components = [chatbot, stop_signal, hint_box, txt, combined_btn, audio_output, is_voice_chat, fact_check_btn_row, staged_image, file_input, file_download_output]
637
 
638
+ # --- WIRING ---
639
  btn_menu.click(
640
  fn=toggle_menu, inputs=[menu_visible_state], outputs=[menu_visible_state, menu_options_row, fact_check_btn_row, btn_menu], queue=False
641
  )
 
677
 
678
  generator_inputs = [txt, staged_image, chatbot, stop_signal, is_voice_chat]
679
 
 
680
  txt.submit(
681
  fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False
682
  ).then(
 
685
  fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False
686
  )
687
 
 
688
  combined_btn.click(
689
  fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False
690
  ).then(