Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
|
@@ -19,7 +19,6 @@ import numpy as np
|
|
| 19 |
# --- CONFIGURATION & INITIALIZATION ---
|
| 20 |
# Use string 'cpu' or GPU index string/int like '0'
|
| 21 |
USER_DEVICE = "cpu" # keep as "cpu" on CPU-only hosts; change to "0" for GPU 0
|
| 22 |
-
# convert to values pipelines expect
|
| 23 |
PIPELINE_DEVICE = -1 if str(USER_DEVICE).lower() == "cpu" else int(USER_DEVICE)
|
| 24 |
TORCH_DEVICE = torch.device("cuda") if torch.cuda.is_available() and PIPELINE_DEVICE != -1 else torch.device("cpu")
|
| 25 |
|
|
@@ -35,7 +34,6 @@ REPO_ID = "cosmosai471/Luna-v3"
|
|
| 35 |
MODEL_FILE = "luna.gguf"
|
| 36 |
LOCAL_MODEL_PATH = MODEL_FILE
|
| 37 |
|
| 38 |
-
# Note: model is expected to prefix outputs with [Intent: ...][Confidence: ...] but user wants those strictly hidden.
|
| 39 |
SYSTEM_PROMPT = (
|
| 40 |
"You are Luna, a helpful and friendly AI assistant. For internal tracing you may place Intent/Confidence tags, "
|
| 41 |
"but DO NOT expose these tags in the user-facing response. Any Intent/Confidence/Action metadata must be kept internal."
|
|
@@ -46,13 +44,16 @@ CONFIDENCE_THRESHOLD = 30 # trigger web-search fallback only under this
|
|
| 46 |
STREAM_CHAR_LIMIT = 35000 # cap streaming characters
|
| 47 |
STREAM_ITER_LIMIT = 20000 # cap streaming iterations
|
| 48 |
MIN_MEANINGFUL_LENGTH = 20 # min length for file-generation prompts
|
|
|
|
| 49 |
|
|
|
|
| 50 |
def safe_del(self):
|
| 51 |
try:
|
| 52 |
if hasattr(self, "close") and callable(self.close):
|
| 53 |
self.close()
|
| 54 |
except Exception:
|
| 55 |
pass
|
|
|
|
| 56 |
Llama.__del__ = safe_del
|
| 57 |
|
| 58 |
# --- MODEL LOADING ---
|
|
@@ -103,15 +104,14 @@ try:
|
|
| 103 |
except Exception as e:
|
| 104 |
print(f"β οΈ Could not load Image Generation pipeline. Image generation disabled. Error: {e}")
|
| 105 |
|
| 106 |
-
|
| 107 |
-
# --- STRICT SANITIZERS & UTILITIES ---
|
| 108 |
|
| 109 |
def simulate_recording_delay():
|
| 110 |
time.sleep(3)
|
| 111 |
return None
|
| 112 |
|
| 113 |
def remove_bracketed_tags(text: str) -> str:
|
| 114 |
-
"""Remove bracketed tags like [Intent: ...] [Confidence: ...] exactly."""
|
| 115 |
if not text:
|
| 116 |
return ""
|
| 117 |
text = re.sub(r'\[Intent:\s*[\w\-\_]+\]', '', text, flags=re.IGNORECASE)
|
|
@@ -119,8 +119,8 @@ def remove_bracketed_tags(text: str) -> str:
|
|
| 119 |
text = re.sub(r'\[Action:\s*[^\]]+\]', '', text, flags=re.IGNORECASE)
|
| 120 |
return text
|
| 121 |
|
| 122 |
-
def
|
| 123 |
-
"""Remove
|
| 124 |
if not text:
|
| 125 |
return ""
|
| 126 |
text = re.sub(r'(?im)^\s*Intent\s*[:\-]\s*.*$', '', text)
|
|
@@ -128,56 +128,48 @@ def remove_plain_labels_lines(text: str) -> str:
|
|
| 128 |
text = re.sub(r'(?im)^\s*Action\s*[:\-]\s*.*$', '', text)
|
| 129 |
return text
|
| 130 |
|
| 131 |
-
def
|
| 132 |
-
"""
|
| 133 |
-
|
| 134 |
-
|
| 135 |
-
"""
|
| 136 |
if not text:
|
| 137 |
return ""
|
| 138 |
-
#
|
| 139 |
-
|
| 140 |
-
|
| 141 |
-
|
| 142 |
-
|
| 143 |
-
|
| 144 |
-
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
|
| 148 |
-
# also remove isolated numeric sequences that remain on their own
|
| 149 |
-
new = re.sub(r'\b\d{1,3}(?:\s+\d{1,3})*\b', '', new)
|
| 150 |
-
return new
|
| 151 |
-
|
| 152 |
-
def collapse_whitespace_and_punct(text: str) -> str:
|
| 153 |
if not text:
|
| 154 |
return ""
|
| 155 |
-
# collapse multiple newlines and trim whitespace
|
| 156 |
text = re.sub(r'\n\s*\n+', '\n\n', text)
|
| 157 |
-
# remove excessive spaces
|
| 158 |
text = re.sub(r'[ \t]{2,}', ' ', text)
|
| 159 |
-
# trim leading/trailing
|
| 160 |
return text.strip()
|
| 161 |
|
| 162 |
-
def
|
| 163 |
"""
|
| 164 |
-
|
| 165 |
-
|
| 166 |
"""
|
| 167 |
if not raw:
|
| 168 |
return ""
|
| 169 |
s = raw
|
| 170 |
s = remove_bracketed_tags(s)
|
| 171 |
-
s =
|
| 172 |
-
s =
|
| 173 |
-
s =
|
| 174 |
-
# final guard to remove
|
| 175 |
s = re.sub(r'(?i)\bIntent\b', '', s)
|
| 176 |
s = re.sub(r'(?i)\bConfidence\b', '', s)
|
| 177 |
s = re.sub(r'(?i)\bAction\b', '', s)
|
| 178 |
-
s =
|
| 179 |
return s.strip()
|
| 180 |
|
|
|
|
| 181 |
def web_search_tool(query: str) -> str:
|
| 182 |
time.sleep(1.2)
|
| 183 |
print(f"Simulating Google Search fallback for: {query}")
|
|
@@ -186,10 +178,9 @@ def web_search_tool(query: str) -> str:
|
|
| 186 |
def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str:
|
| 187 |
"""
|
| 188 |
Internal: parse confidence if present (for logic only), but never display it. If fallback triggered,
|
| 189 |
-
append web results to sanitized response.
|
| 190 |
"""
|
| 191 |
-
cleaned_for_logic = remove_bracketed_tags(raw_response_with_tags)
|
| 192 |
-
# extract confidence if any (internal only)
|
| 193 |
confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE)
|
| 194 |
if confidence_match:
|
| 195 |
try:
|
|
@@ -198,32 +189,38 @@ def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> st
|
|
| 198 |
except Exception:
|
| 199 |
confidence_score = 0
|
| 200 |
else:
|
| 201 |
-
|
| 202 |
-
cleaned_no_tags = strict_sanitize_for_ui(cleaned_for_logic)
|
| 203 |
confidence_score = 10 if not cleaned_no_tags or len(cleaned_no_tags) < 30 else 85
|
| 204 |
|
| 205 |
-
# If low, augment with web snippet; final output is always sanitized
|
| 206 |
if confidence_score < CONFIDENCE_THRESHOLD:
|
| 207 |
print(f"[internal] Low confidence ({confidence_score}%) detected -> using web fallback")
|
| 208 |
supplement = web_search_tool(prompt)
|
| 209 |
-
out =
|
| 210 |
if not out:
|
| 211 |
-
out = "I couldn't generate a reliable answer. " +
|
| 212 |
else:
|
| 213 |
-
out = out + "\n\n" +
|
| 214 |
else:
|
| 215 |
-
out =
|
| 216 |
|
| 217 |
-
# final guard: don't return empty
|
| 218 |
out = out or "Sorry β I couldn't produce a good answer. Could you rephrase or give more details?"
|
| 219 |
return out
|
| 220 |
|
| 221 |
-
# --- IMAGE / VQA PROCESSING (robust) ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 222 |
|
| 223 |
def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
| 224 |
"""
|
| 225 |
-
Uses image_pipe to produce VQA text.
|
| 226 |
-
|
| 227 |
"""
|
| 228 |
global image_pipe
|
| 229 |
success = False
|
|
@@ -237,7 +234,6 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
|
| 237 |
elif isinstance(image_data_or_path, np.ndarray):
|
| 238 |
image = Image.fromarray(image_data_or_path).convert("RGB")
|
| 239 |
else:
|
| 240 |
-
# bytes or file-like
|
| 241 |
try:
|
| 242 |
image = Image.open(BytesIO(image_data_or_path)).convert("RGB")
|
| 243 |
except Exception:
|
|
@@ -246,10 +242,13 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
|
| 246 |
if image is None:
|
| 247 |
return f"[Image Processing Error: Could not open image.] **User Query:** {message}", False
|
| 248 |
|
|
|
|
|
|
|
|
|
|
| 249 |
vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:"
|
| 250 |
-
# defensive calls: some pipelines accept prompt kwarg, some don't
|
| 251 |
results = None
|
| 252 |
try:
|
|
|
|
| 253 |
results = image_pipe(image, prompt=vqa_prompt)
|
| 254 |
except TypeError:
|
| 255 |
try:
|
|
@@ -280,9 +279,8 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
|
| 280 |
except Exception:
|
| 281 |
raw_text = ""
|
| 282 |
|
| 283 |
-
# pick assistant section if available
|
| 284 |
vqa_response = raw_text.split("ASSISTANT:")[-1].strip() if raw_text else ""
|
| 285 |
-
vqa_response =
|
| 286 |
|
| 287 |
if not vqa_response or len(vqa_response) < 10:
|
| 288 |
vqa_response = (
|
|
@@ -293,13 +291,12 @@ def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
|
| 293 |
else:
|
| 294 |
success = True
|
| 295 |
|
| 296 |
-
|
| 297 |
-
prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {strict_sanitize_for_ui(message)}"
|
| 298 |
return prompt_injection, success
|
| 299 |
|
| 300 |
except Exception as e:
|
| 301 |
print(f"Image processing exception: {e}")
|
| 302 |
-
return f"[Image Processing Error: {e}] **User Query:** {
|
| 303 |
|
| 304 |
# --- AUDIO / TTS ---
|
| 305 |
|
|
@@ -322,6 +319,7 @@ def transcribe_audio(audio_file_path: str) -> Tuple[str, str, gr.update, gr.upda
|
|
| 322 |
error_msg = f"Transcription Error: {e}"
|
| 323 |
return "", error_msg, gr.update(interactive=True), gr.update(value="β", interactive=True, elem_classes=["circle-btn", "send-mode"]), False, gr.update(visible=False)
|
| 324 |
|
|
|
|
| 325 |
def text_to_audio(text: str, is_voice_chat: bool) -> str or None:
|
| 326 |
if not is_voice_chat:
|
| 327 |
return None
|
|
@@ -337,7 +335,7 @@ def text_to_audio(text: str, is_voice_chat: bool) -> str or None:
|
|
| 337 |
return None
|
| 338 |
return None
|
| 339 |
|
| 340 |
-
# --- INTENT
|
| 341 |
INTENT_STATUS_MAP = {
|
| 342 |
"code_generate": "Analyzing requirements and drafting code π»...",
|
| 343 |
"code_explain": "Reviewing code logic and writing explanation π‘...",
|
|
@@ -352,22 +350,47 @@ INTENT_STATUS_MAP = {
|
|
| 352 |
"default": "Luna is thinking...",
|
| 353 |
}
|
| 354 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 355 |
def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]:
|
| 356 |
"""
|
| 357 |
Internal parsing: returns (intent, status, cleaned_display_text).
|
| 358 |
-
cleaned_display_text
|
|
|
|
| 359 |
"""
|
| 360 |
intent_match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE)
|
| 361 |
-
intent = intent_match.group(1).lower() if intent_match else
|
| 362 |
if is_vqa_flow:
|
| 363 |
intent = "vqa"
|
| 364 |
|
| 365 |
-
|
| 366 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 367 |
status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"])
|
| 368 |
return intent, status, cleaned_text
|
| 369 |
|
| 370 |
# --- FILE / IMAGE GENERATION ---
|
|
|
|
| 371 |
def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str):
|
| 372 |
file_path = None
|
| 373 |
try:
|
|
@@ -417,13 +440,13 @@ def generate_file_content(content: str, history: List[Dict[str, str]], file_type
|
|
| 417 |
return history, file_path
|
| 418 |
|
| 419 |
# --- CORE GENERATOR FUNCTION ---
|
|
|
|
| 420 |
def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any:
|
| 421 |
"""
|
| 422 |
-
- assistant entry
|
| 423 |
-
- streaming
|
| 424 |
-
-
|
| 425 |
"""
|
| 426 |
-
# Expect last entry to be user
|
| 427 |
if not history or history[-1]['role'] != 'user':
|
| 428 |
yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 429 |
return
|
|
@@ -431,7 +454,7 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 431 |
last_user_index = len(history) - 1
|
| 432 |
original_message = history[last_user_index]['content'] or ""
|
| 433 |
|
| 434 |
-
# detect VQA flow
|
| 435 |
is_vqa_flow = False
|
| 436 |
if isinstance(image_input_data, str):
|
| 437 |
is_vqa_flow = bool(image_input_data)
|
|
@@ -444,8 +467,8 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 444 |
llm_input_message = original_message
|
| 445 |
if is_vqa_flow:
|
| 446 |
processed_message, vqa_success = process_image(image_input_data, original_message)
|
| 447 |
-
|
| 448 |
-
|
| 449 |
llm_input_message = processed_message
|
| 450 |
|
| 451 |
# build prompt
|
|
@@ -459,11 +482,10 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 459 |
prompt += f"USER: {content}\n"
|
| 460 |
prompt += f"USER: {llm_input_message}\nLUNA: "
|
| 461 |
|
| 462 |
-
# append assistant entry
|
| 463 |
assistant_initial_text = "β¨ Luna is starting to think..."
|
| 464 |
history.append({"role": "assistant", "content": assistant_initial_text})
|
| 465 |
|
| 466 |
-
# initial UI hint
|
| 467 |
yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 468 |
time.sleep(0.12)
|
| 469 |
|
|
@@ -479,11 +501,11 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 479 |
)
|
| 480 |
except Exception as e:
|
| 481 |
err = f"β Error generating response: {e}"
|
| 482 |
-
history[-1]['content'] =
|
| 483 |
yield history, False, err, gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 484 |
return
|
| 485 |
|
| 486 |
-
# stream tokens;
|
| 487 |
try:
|
| 488 |
for output in stream:
|
| 489 |
iter_count += 1
|
|
@@ -502,10 +524,8 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 502 |
print("Stream truncated by char limit.")
|
| 503 |
break
|
| 504 |
|
| 505 |
-
# extract intent/status for internal UX hints but NEVER show tags
|
| 506 |
current_intent, current_hint, interim = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 507 |
-
|
| 508 |
-
interim_ui = strict_sanitize_for_ui(interim)
|
| 509 |
if not interim_ui:
|
| 510 |
interim_ui = "β¨ Luna is forming a reply..."
|
| 511 |
history[-1]['content'] = interim_ui
|
|
@@ -514,7 +534,7 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 514 |
|
| 515 |
except Exception as e:
|
| 516 |
_, _, salvage = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 517 |
-
salvage_ui =
|
| 518 |
history[-1]['content'] = salvage_ui
|
| 519 |
yield history, False, f"β οΈ Streaming interrupted: {e}", gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update()
|
| 520 |
return
|
|
@@ -522,7 +542,7 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 522 |
# post-process
|
| 523 |
file_download_path = None
|
| 524 |
_, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 525 |
-
content_for_tool =
|
| 526 |
|
| 527 |
if current_intent == "image_generate":
|
| 528 |
if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
|
|
@@ -549,19 +569,17 @@ def chat_generator(message_from_input: str, image_input_data: Any, history: List
|
|
| 549 |
history, file_download_path = generate_file_content(content_for_tool, history, "ppt")
|
| 550 |
|
| 551 |
elif current_intent == "open_google":
|
| 552 |
-
final_text = (content_for_tool or "").strip() + "\n\nπ **Action:** [Search Google](https://www.google.com/search?q=" + re.sub(r'\s+', '+',
|
| 553 |
-
history[-1]['content'] =
|
| 554 |
|
| 555 |
elif current_intent == "open_camera":
|
| 556 |
final_text = (content_for_tool or "").strip() + "\n\nπΈ **Action:** Use the 'Google Lens' button to capture an image."
|
| 557 |
-
history[-1]['content'] =
|
| 558 |
|
| 559 |
else:
|
| 560 |
-
# normal path: evaluate confidence and maybe augment
|
| 561 |
final_response_content = check_confidence_and_augment(full_response, original_message)
|
| 562 |
history[-1]['content'] = final_response_content
|
| 563 |
|
| 564 |
-
# final defensive fallback
|
| 565 |
if not history[-1]['content'] or not str(history[-1]['content']).strip():
|
| 566 |
history[-1]['content'] = "Sorry β I couldn't produce a useful response. Could you rephrase or add details?"
|
| 567 |
|
|
@@ -576,6 +594,7 @@ def toggle_menu(current_visibility: bool) -> Tuple[bool, gr.update, gr.update, g
|
|
| 576 |
new_visibility = not current_visibility
|
| 577 |
return new_visibility, gr.update(visible=new_visibility), gr.update(visible=False), gr.update(value="β¬οΈ" if new_visibility else "β")
|
| 578 |
|
|
|
|
| 579 |
def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]:
|
| 580 |
has_text = bool(user_message and user_message.strip())
|
| 581 |
has_image = False
|
|
@@ -589,22 +608,24 @@ def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_imag
|
|
| 589 |
if not has_text and not has_image:
|
| 590 |
return user_message, chat_history
|
| 591 |
|
| 592 |
-
# prevent double send if assistant currently thinking
|
| 593 |
if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower():
|
| 594 |
return user_message, chat_history
|
| 595 |
|
| 596 |
user_message_to_add = "Analyzing Staged Media." if (not has_text and has_image) else user_message.strip()
|
| 597 |
-
chat_history.append({"role": "user", "content":
|
| 598 |
return "", chat_history
|
| 599 |
|
|
|
|
| 600 |
def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
|
| 601 |
if file_path:
|
| 602 |
return file_path, f"π File staged: {os.path.basename(file_path)}. Click send (βοΈ).", gr.update(value="", interactive=True), gr.update(interactive=False)
|
| 603 |
return None, "File upload cancelled.", gr.update(value="", interactive=True), gr.update(interactive=False)
|
| 604 |
|
|
|
|
| 605 |
def clear_staged_media() -> gr.update:
|
| 606 |
return gr.update(value=None)
|
| 607 |
|
|
|
|
| 608 |
def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str, gr.update]:
|
| 609 |
if not history or not history[-1]['content']:
|
| 610 |
return history, "Error: No final response to check.", gr.update(visible=False)
|
|
@@ -617,9 +638,10 @@ def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str
|
|
| 617 |
return history, "Error: Could not find query.", gr.update(visible=False)
|
| 618 |
web_results = web_search_tool(last_user_prompt)
|
| 619 |
new_history = list(history)
|
| 620 |
-
new_history[-1]['content'] += "\n\n" +
|
| 621 |
return new_history, "β
Double-checked with web facts.", gr.update(visible=False)
|
| 622 |
|
|
|
|
| 623 |
def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]], Any, gr.update, gr.update, gr.update, gr.update, gr.update]:
|
| 624 |
_, chat_history = user_turn(user_message, chat_history, staged_image_input)
|
| 625 |
if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] == "":
|
|
@@ -627,13 +649,13 @@ def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], s
|
|
| 627 |
return "", chat_history, staged_image_input, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value="πΈ Capturing in 3 seconds...", interactive=False), gr.update(value="β")
|
| 628 |
|
| 629 |
# --- GRADIO UI ---
|
| 630 |
-
with gr.Blocks(theme=gr.themes.Soft(), title="
|
| 631 |
stop_signal = gr.State(value=False)
|
| 632 |
is_voice_chat = gr.State(value=False)
|
| 633 |
staged_image = gr.State(value=None)
|
| 634 |
menu_visible_state = gr.State(value=False)
|
| 635 |
|
| 636 |
-
gr.HTML("<h1 style='text-align: center; color: #4B0082;'
|
| 637 |
|
| 638 |
hint_box = gr.Textbox(value="Ask anything", lines=1, show_label=False, interactive=False, placeholder="Luna's Action...", visible=True)
|
| 639 |
file_download_output = gr.File(label="Generated File", visible=False)
|
|
|
|
| 19 |
# --- CONFIGURATION & INITIALIZATION ---
|
| 20 |
# Use string 'cpu' or GPU index string/int like '0'
|
| 21 |
USER_DEVICE = "cpu" # keep as "cpu" on CPU-only hosts; change to "0" for GPU 0
|
|
|
|
| 22 |
PIPELINE_DEVICE = -1 if str(USER_DEVICE).lower() == "cpu" else int(USER_DEVICE)
|
| 23 |
TORCH_DEVICE = torch.device("cuda") if torch.cuda.is_available() and PIPELINE_DEVICE != -1 else torch.device("cpu")
|
| 24 |
|
|
|
|
| 34 |
MODEL_FILE = "luna.gguf"
|
| 35 |
LOCAL_MODEL_PATH = MODEL_FILE
|
| 36 |
|
|
|
|
| 37 |
SYSTEM_PROMPT = (
|
| 38 |
"You are Luna, a helpful and friendly AI assistant. For internal tracing you may place Intent/Confidence tags, "
|
| 39 |
"but DO NOT expose these tags in the user-facing response. Any Intent/Confidence/Action metadata must be kept internal."
|
|
|
|
| 44 |
STREAM_CHAR_LIMIT = 35000 # cap streaming characters
|
| 45 |
STREAM_ITER_LIMIT = 20000 # cap streaming iterations
|
| 46 |
MIN_MEANINGFUL_LENGTH = 20 # min length for file-generation prompts
|
| 47 |
+
IMAGE_MAX_SIDE = 1024 # resize images to this max side before sending to image pipeline
|
| 48 |
|
| 49 |
+
# safe destructor for Llama objects
|
| 50 |
def safe_del(self):
|
| 51 |
try:
|
| 52 |
if hasattr(self, "close") and callable(self.close):
|
| 53 |
self.close()
|
| 54 |
except Exception:
|
| 55 |
pass
|
| 56 |
+
|
| 57 |
Llama.__del__ = safe_del
|
| 58 |
|
| 59 |
# --- MODEL LOADING ---
|
|
|
|
| 104 |
except Exception as e:
|
| 105 |
print(f"β οΈ Could not load Image Generation pipeline. Image generation disabled. Error: {e}")
|
| 106 |
|
| 107 |
+
# --- SANITIZERS & UTILITIES ---
|
|
|
|
| 108 |
|
| 109 |
def simulate_recording_delay():
|
| 110 |
time.sleep(3)
|
| 111 |
return None
|
| 112 |
|
| 113 |
def remove_bracketed_tags(text: str) -> str:
|
| 114 |
+
"""Remove bracketed tags like [Intent: ...] [Confidence: ...] exactly (safe)."""
|
| 115 |
if not text:
|
| 116 |
return ""
|
| 117 |
text = re.sub(r'\[Intent:\s*[\w\-\_]+\]', '', text, flags=re.IGNORECASE)
|
|
|
|
| 119 |
text = re.sub(r'\[Action:\s*[^\]]+\]', '', text, flags=re.IGNORECASE)
|
| 120 |
return text
|
| 121 |
|
| 122 |
+
def remove_plain_tag_lines(text: str) -> str:
|
| 123 |
+
"""Remove whole lines that are just 'Intent: ...' or 'Confidence: ...' preserving inline content."""
|
| 124 |
if not text:
|
| 125 |
return ""
|
| 126 |
text = re.sub(r'(?im)^\s*Intent\s*[:\-]\s*.*$', '', text)
|
|
|
|
| 128 |
text = re.sub(r'(?im)^\s*Action\s*[:\-]\s*.*$', '', text)
|
| 129 |
return text
|
| 130 |
|
| 131 |
+
def remove_word_number_dumps(text: str) -> str:
|
| 132 |
+
"""Remove big classifier dumps like 'greeting 99 2. goodbye 99' but try to preserve normal text.
|
| 133 |
+
This removes sequences where a word token is followed immediately by 1-3 numbers and repeats (likely classifier logs).
|
| 134 |
+
Only removes when they appear as standalone clusters (surrounded by line breaks or punctuation)."""
|
|
|
|
| 135 |
if not text:
|
| 136 |
return ""
|
| 137 |
+
# find clusters between line boundaries or punctuation
|
| 138 |
+
cluster_pattern = re.compile(r'(?:\n|^|[\(\[\{\.;:,\-\|>])\s*([a-zA-Z_\-]{2,40}(?:\s+\d{1,3}){1,4}(?:\s+[a-zA-Z_\-]{2,40}(?:\s+\d{1,3}){1,4})*)\s*(?:\n|$|[\)\]\}\.;:,\-\|<])', flags=re.IGNORECASE)
|
| 139 |
+
def _strip_cluster(m):
|
| 140 |
+
return '\n' # replace cluster with a newline to preserve sentence boundaries
|
| 141 |
+
text = cluster_pattern.sub(_strip_cluster, text)
|
| 142 |
+
# remove leftover isolated numeric sequences (only small groups)
|
| 143 |
+
text = re.sub(r'\b\d{2,3}(?:\s+\d{1,3})*\b', '', text)
|
| 144 |
+
return text
|
| 145 |
+
|
| 146 |
+
def collapse_whitespace(text: str) -> str:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 147 |
if not text:
|
| 148 |
return ""
|
|
|
|
| 149 |
text = re.sub(r'\n\s*\n+', '\n\n', text)
|
|
|
|
| 150 |
text = re.sub(r'[ \t]{2,}', ' ', text)
|
|
|
|
| 151 |
return text.strip()
|
| 152 |
|
| 153 |
+
def moderate_sanitize_for_ui(raw: str) -> str:
|
| 154 |
"""
|
| 155 |
+
Moderate sanitizer: removes bracketed tags, whole tag-lines, and classifier dumps (carefully),
|
| 156 |
+
but otherwise preserves natural language content.
|
| 157 |
"""
|
| 158 |
if not raw:
|
| 159 |
return ""
|
| 160 |
s = raw
|
| 161 |
s = remove_bracketed_tags(s)
|
| 162 |
+
s = remove_plain_tag_lines(s)
|
| 163 |
+
s = remove_word_number_dumps(s)
|
| 164 |
+
s = collapse_whitespace(s)
|
| 165 |
+
# final quick guard to remove exact words 'Intent' or 'Confidence' if accidentally left alone
|
| 166 |
s = re.sub(r'(?i)\bIntent\b', '', s)
|
| 167 |
s = re.sub(r'(?i)\bConfidence\b', '', s)
|
| 168 |
s = re.sub(r'(?i)\bAction\b', '', s)
|
| 169 |
+
s = collapse_whitespace(s)
|
| 170 |
return s.strip()
|
| 171 |
|
| 172 |
+
# web-search stub
|
| 173 |
def web_search_tool(query: str) -> str:
|
| 174 |
time.sleep(1.2)
|
| 175 |
print(f"Simulating Google Search fallback for: {query}")
|
|
|
|
| 178 |
def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str:
|
| 179 |
"""
|
| 180 |
Internal: parse confidence if present (for logic only), but never display it. If fallback triggered,
|
| 181 |
+
append web results to sanitized response. Uses moderate sanitizer to avoid eating valid content.
|
| 182 |
"""
|
| 183 |
+
cleaned_for_logic = remove_bracketed_tags(raw_response_with_tags)
|
|
|
|
| 184 |
confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE)
|
| 185 |
if confidence_match:
|
| 186 |
try:
|
|
|
|
| 189 |
except Exception:
|
| 190 |
confidence_score = 0
|
| 191 |
else:
|
| 192 |
+
cleaned_no_tags = moderate_sanitize_for_ui(cleaned_for_logic)
|
|
|
|
| 193 |
confidence_score = 10 if not cleaned_no_tags or len(cleaned_no_tags) < 30 else 85
|
| 194 |
|
|
|
|
| 195 |
if confidence_score < CONFIDENCE_THRESHOLD:
|
| 196 |
print(f"[internal] Low confidence ({confidence_score}%) detected -> using web fallback")
|
| 197 |
supplement = web_search_tool(prompt)
|
| 198 |
+
out = moderate_sanitize_for_ui(cleaned_for_logic)
|
| 199 |
if not out:
|
| 200 |
+
out = "I couldn't generate a reliable answer. " + moderate_sanitize_for_ui(supplement)
|
| 201 |
else:
|
| 202 |
+
out = out + "\n\n" + moderate_sanitize_for_ui(supplement)
|
| 203 |
else:
|
| 204 |
+
out = moderate_sanitize_for_ui(cleaned_for_logic)
|
| 205 |
|
|
|
|
| 206 |
out = out or "Sorry β I couldn't produce a good answer. Could you rephrase or give more details?"
|
| 207 |
return out
|
| 208 |
|
| 209 |
+
# --- IMAGE / VQA PROCESSING (robust + resize) ---
|
| 210 |
+
|
| 211 |
+
def _resize_image_keep_aspect(img: Image.Image, max_side: int) -> Image.Image:
|
| 212 |
+
w, h = img.size
|
| 213 |
+
if max(w, h) <= max_side:
|
| 214 |
+
return img
|
| 215 |
+
scale = max_side / float(max(w, h))
|
| 216 |
+
new_w = int(w * scale)
|
| 217 |
+
new_h = int(h * scale)
|
| 218 |
+
return img.resize((new_w, new_h), Image.LANCZOS)
|
| 219 |
|
| 220 |
def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]:
|
| 221 |
"""
|
| 222 |
+
Uses image_pipe to produce VQA text. Resizes image to avoid token/feature mismatch issues.
|
| 223 |
+
Returns prompt-injection (safe) + success flag.
|
| 224 |
"""
|
| 225 |
global image_pipe
|
| 226 |
success = False
|
|
|
|
| 234 |
elif isinstance(image_data_or_path, np.ndarray):
|
| 235 |
image = Image.fromarray(image_data_or_path).convert("RGB")
|
| 236 |
else:
|
|
|
|
| 237 |
try:
|
| 238 |
image = Image.open(BytesIO(image_data_or_path)).convert("RGB")
|
| 239 |
except Exception:
|
|
|
|
| 242 |
if image is None:
|
| 243 |
return f"[Image Processing Error: Could not open image.] **User Query:** {message}", False
|
| 244 |
|
| 245 |
+
# Resize defensively before passing to VLM pipeline (fixes token/features mismatch errors)
|
| 246 |
+
image = _resize_image_keep_aspect(image, IMAGE_MAX_SIDE)
|
| 247 |
+
|
| 248 |
vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:"
|
|
|
|
| 249 |
results = None
|
| 250 |
try:
|
| 251 |
+
# preferred signature
|
| 252 |
results = image_pipe(image, prompt=vqa_prompt)
|
| 253 |
except TypeError:
|
| 254 |
try:
|
|
|
|
| 279 |
except Exception:
|
| 280 |
raw_text = ""
|
| 281 |
|
|
|
|
| 282 |
vqa_response = raw_text.split("ASSISTANT:")[-1].strip() if raw_text else ""
|
| 283 |
+
vqa_response = moderate_sanitize_for_ui(vqa_response)
|
| 284 |
|
| 285 |
if not vqa_response or len(vqa_response) < 10:
|
| 286 |
vqa_response = (
|
|
|
|
| 291 |
else:
|
| 292 |
success = True
|
| 293 |
|
| 294 |
+
prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {moderate_sanitize_for_ui(message)}"
|
|
|
|
| 295 |
return prompt_injection, success
|
| 296 |
|
| 297 |
except Exception as e:
|
| 298 |
print(f"Image processing exception: {e}")
|
| 299 |
+
return f"[Image Processing Error: {e}] **User Query:** {moderate_sanitize_for_ui(message)}", False
|
| 300 |
|
| 301 |
# --- AUDIO / TTS ---
|
| 302 |
|
|
|
|
| 319 |
error_msg = f"Transcription Error: {e}"
|
| 320 |
return "", error_msg, gr.update(interactive=True), gr.update(value="β", interactive=True, elem_classes=["circle-btn", "send-mode"]), False, gr.update(visible=False)
|
| 321 |
|
| 322 |
+
|
| 323 |
def text_to_audio(text: str, is_voice_chat: bool) -> str or None:
|
| 324 |
if not is_voice_chat:
|
| 325 |
return None
|
|
|
|
| 335 |
return None
|
| 336 |
return None
|
| 337 |
|
| 338 |
+
# --- INTENT MAP & PARSING ---
|
| 339 |
INTENT_STATUS_MAP = {
|
| 340 |
"code_generate": "Analyzing requirements and drafting code π»...",
|
| 341 |
"code_explain": "Reviewing code logic and writing explanation π‘...",
|
|
|
|
| 350 |
"default": "Luna is thinking...",
|
| 351 |
}
|
| 352 |
|
| 353 |
+
# Additional keyword-based intent inference (helps when model doesn't include tags)
|
| 354 |
+
INTENT_KEYWORD_MAP = [
|
| 355 |
+
(re.compile(r"\b(create|generate|make)\b.*\b(image|picture|photo|art)\b", flags=re.IGNORECASE), "image_generate"),
|
| 356 |
+
(re.compile(r"\b(create|generate|make)\b.*\b(document|doc|report|letter|resume)\b", flags=re.IGNORECASE), "doc_generate"),
|
| 357 |
+
(re.compile(r"\b(create|generate|make)\b.*\b(presentation|ppt|slides)\b", flags=re.IGNORECASE), "ppt_generate"),
|
| 358 |
+
]
|
| 359 |
+
|
| 360 |
+
|
| 361 |
+
def infer_intent_from_content(text: str) -> str:
|
| 362 |
+
if not text:
|
| 363 |
+
return "default"
|
| 364 |
+
for patt, intent in INTENT_KEYWORD_MAP:
|
| 365 |
+
if patt.search(text):
|
| 366 |
+
return intent
|
| 367 |
+
return "default"
|
| 368 |
+
|
| 369 |
+
|
| 370 |
def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]:
|
| 371 |
"""
|
| 372 |
Internal parsing: returns (intent, status, cleaned_display_text).
|
| 373 |
+
cleaned_display_text preserves content but strips tags/garbage moderately.
|
| 374 |
+
If no explicit [Intent:] tag is found, infer intent from content_for_tool keywords.
|
| 375 |
"""
|
| 376 |
intent_match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE)
|
| 377 |
+
intent = intent_match.group(1).lower() if intent_match else None
|
| 378 |
if is_vqa_flow:
|
| 379 |
intent = "vqa"
|
| 380 |
|
| 381 |
+
cleaned_text = moderate_sanitize_for_ui(raw_response)
|
| 382 |
+
# If no explicit intent from tags, try to infer from cleaned_text
|
| 383 |
+
if not intent or intent == "default":
|
| 384 |
+
inferred = infer_intent_from_content(cleaned_text)
|
| 385 |
+
if inferred != "default":
|
| 386 |
+
intent = inferred
|
| 387 |
+
intent = intent or "default"
|
| 388 |
+
|
| 389 |
status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"])
|
| 390 |
return intent, status, cleaned_text
|
| 391 |
|
| 392 |
# --- FILE / IMAGE GENERATION ---
|
| 393 |
+
|
| 394 |
def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str):
|
| 395 |
file_path = None
|
| 396 |
try:
|
|
|
|
| 440 |
return history, file_path
|
| 441 |
|
| 442 |
# --- CORE GENERATOR FUNCTION ---
|
| 443 |
+
|
| 444 |
def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any:
|
| 445 |
"""
|
| 446 |
+
- assistant entry appended only when generation actually starts (no empty box).
|
| 447 |
+
- streaming sanitized moderately to keep meaning while removing metadata.
|
| 448 |
+
- when image is attached, VQA flow is strictly used (image model output injected to LLM).
|
| 449 |
"""
|
|
|
|
| 450 |
if not history or history[-1]['role'] != 'user':
|
| 451 |
yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 452 |
return
|
|
|
|
| 454 |
last_user_index = len(history) - 1
|
| 455 |
original_message = history[last_user_index]['content'] or ""
|
| 456 |
|
| 457 |
+
# detect VQA flow: if image attached, force image flow
|
| 458 |
is_vqa_flow = False
|
| 459 |
if isinstance(image_input_data, str):
|
| 460 |
is_vqa_flow = bool(image_input_data)
|
|
|
|
| 467 |
llm_input_message = original_message
|
| 468 |
if is_vqa_flow:
|
| 469 |
processed_message, vqa_success = process_image(image_input_data, original_message)
|
| 470 |
+
history[last_user_index]['content'] = f"[IMAGE RECEIVED] {moderate_sanitize_for_ui(original_message)}"
|
| 471 |
+
# ensure that LLM prompt includes VQA analysis and the user message
|
| 472 |
llm_input_message = processed_message
|
| 473 |
|
| 474 |
# build prompt
|
|
|
|
| 482 |
prompt += f"USER: {content}\n"
|
| 483 |
prompt += f"USER: {llm_input_message}\nLUNA: "
|
| 484 |
|
| 485 |
+
# append assistant entry now
|
| 486 |
assistant_initial_text = "β¨ Luna is starting to think..."
|
| 487 |
history.append({"role": "assistant", "content": assistant_initial_text})
|
| 488 |
|
|
|
|
| 489 |
yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 490 |
time.sleep(0.12)
|
| 491 |
|
|
|
|
| 501 |
)
|
| 502 |
except Exception as e:
|
| 503 |
err = f"β Error generating response: {e}"
|
| 504 |
+
history[-1]['content'] = moderate_sanitize_for_ui(err)
|
| 505 |
yield history, False, err, gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update()
|
| 506 |
return
|
| 507 |
|
| 508 |
+
# stream tokens; moderately sanitize and cap
|
| 509 |
try:
|
| 510 |
for output in stream:
|
| 511 |
iter_count += 1
|
|
|
|
| 524 |
print("Stream truncated by char limit.")
|
| 525 |
break
|
| 526 |
|
|
|
|
| 527 |
current_intent, current_hint, interim = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 528 |
+
interim_ui = moderate_sanitize_for_ui(interim)
|
|
|
|
| 529 |
if not interim_ui:
|
| 530 |
interim_ui = "β¨ Luna is forming a reply..."
|
| 531 |
history[-1]['content'] = interim_ui
|
|
|
|
| 534 |
|
| 535 |
except Exception as e:
|
| 536 |
_, _, salvage = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 537 |
+
salvage_ui = moderate_sanitize_for_ui(salvage) or f"β οΈ Streaming interrupted: {e}"
|
| 538 |
history[-1]['content'] = salvage_ui
|
| 539 |
yield history, False, f"β οΈ Streaming interrupted: {e}", gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update()
|
| 540 |
return
|
|
|
|
| 542 |
# post-process
|
| 543 |
file_download_path = None
|
| 544 |
_, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success)
|
| 545 |
+
content_for_tool = moderate_sanitize_for_ui(content_for_tool)
|
| 546 |
|
| 547 |
if current_intent == "image_generate":
|
| 548 |
if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH:
|
|
|
|
| 569 |
history, file_download_path = generate_file_content(content_for_tool, history, "ppt")
|
| 570 |
|
| 571 |
elif current_intent == "open_google":
|
| 572 |
+
final_text = (content_for_tool or "").strip() + "\n\nπ **Action:** [Search Google](https://www.google.com/search?q=" + re.sub(r'\s+', '+', moderate_sanitize_for_ui(original_message)) + ")"
|
| 573 |
+
history[-1]['content'] = moderate_sanitize_for_ui(final_text)
|
| 574 |
|
| 575 |
elif current_intent == "open_camera":
|
| 576 |
final_text = (content_for_tool or "").strip() + "\n\nπΈ **Action:** Use the 'Google Lens' button to capture an image."
|
| 577 |
+
history[-1]['content'] = moderate_sanitize_for_ui(final_text)
|
| 578 |
|
| 579 |
else:
|
|
|
|
| 580 |
final_response_content = check_confidence_and_augment(full_response, original_message)
|
| 581 |
history[-1]['content'] = final_response_content
|
| 582 |
|
|
|
|
| 583 |
if not history[-1]['content'] or not str(history[-1]['content']).strip():
|
| 584 |
history[-1]['content'] = "Sorry β I couldn't produce a useful response. Could you rephrase or add details?"
|
| 585 |
|
|
|
|
| 594 |
new_visibility = not current_visibility
|
| 595 |
return new_visibility, gr.update(visible=new_visibility), gr.update(visible=False), gr.update(value="β¬οΈ" if new_visibility else "β")
|
| 596 |
|
| 597 |
+
|
| 598 |
def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]:
|
| 599 |
has_text = bool(user_message and user_message.strip())
|
| 600 |
has_image = False
|
|
|
|
| 608 |
if not has_text and not has_image:
|
| 609 |
return user_message, chat_history
|
| 610 |
|
|
|
|
| 611 |
if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower():
|
| 612 |
return user_message, chat_history
|
| 613 |
|
| 614 |
user_message_to_add = "Analyzing Staged Media." if (not has_text and has_image) else user_message.strip()
|
| 615 |
+
chat_history.append({"role": "user", "content": moderate_sanitize_for_ui(user_message_to_add)})
|
| 616 |
return "", chat_history
|
| 617 |
|
| 618 |
+
|
| 619 |
def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]:
|
| 620 |
if file_path:
|
| 621 |
return file_path, f"π File staged: {os.path.basename(file_path)}. Click send (βοΈ).", gr.update(value="", interactive=True), gr.update(interactive=False)
|
| 622 |
return None, "File upload cancelled.", gr.update(value="", interactive=True), gr.update(interactive=False)
|
| 623 |
|
| 624 |
+
|
| 625 |
def clear_staged_media() -> gr.update:
|
| 626 |
return gr.update(value=None)
|
| 627 |
|
| 628 |
+
|
| 629 |
def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str, gr.update]:
|
| 630 |
if not history or not history[-1]['content']:
|
| 631 |
return history, "Error: No final response to check.", gr.update(visible=False)
|
|
|
|
| 638 |
return history, "Error: Could not find query.", gr.update(visible=False)
|
| 639 |
web_results = web_search_tool(last_user_prompt)
|
| 640 |
new_history = list(history)
|
| 641 |
+
new_history[-1]['content'] += "\n\n" + moderate_sanitize_for_ui(web_results)
|
| 642 |
return new_history, "β
Double-checked with web facts.", gr.update(visible=False)
|
| 643 |
|
| 644 |
+
|
| 645 |
def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]], Any, gr.update, gr.update, gr.update, gr.update, gr.update]:
|
| 646 |
_, chat_history = user_turn(user_message, chat_history, staged_image_input)
|
| 647 |
if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] == "":
|
|
|
|
| 649 |
return "", chat_history, staged_image_input, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value="πΈ Capturing in 3 seconds...", interactive=False), gr.update(value="β")
|
| 650 |
|
| 651 |
# --- GRADIO UI ---
|
| 652 |
+
with gr.Blocks(theme=gr.themes.Soft(), title="Prototype") as demo:
|
| 653 |
stop_signal = gr.State(value=False)
|
| 654 |
is_voice_chat = gr.State(value=False)
|
| 655 |
staged_image = gr.State(value=None)
|
| 656 |
menu_visible_state = gr.State(value=False)
|
| 657 |
|
| 658 |
+
gr.HTML("<h1 style='text-align: center; color: #4B0082;'>π Prototype</h1>")
|
| 659 |
|
| 660 |
hint_box = gr.Textbox(value="Ask anything", lines=1, show_label=False, interactive=False, placeholder="Luna's Action...", visible=True)
|
| 661 |
file_download_output = gr.File(label="Generated File", visible=False)
|