| """ |
| 𧬠Darwin-35B-A3B-Opus Q8 GGUF β llama-cpp-python Direct Serving |
| μ μ© GPU Β· OpenAI-compatible streaming Β· 컀μ€ν
νλ‘ νΈμλ |
| """ |
| import sys, subprocess |
| print(f"[BOOT] Python {sys.version}", flush=True) |
|
|
| |
| try: |
| from llama_cpp import Llama |
| print("[BOOT] llama-cpp-python already installed", flush=True) |
| except ImportError: |
| print("[BOOT] Installing llama-cpp-python with CUDA...", flush=True) |
| subprocess.check_call([ |
| sys.executable, "-m", "pip", "install", |
| "llama-cpp-python", "--no-cache-dir", "--prefer-binary", |
| "--extra-index-url", "https://abetlen.github.io/llama-cpp-python/whl/cu124", |
| ]) |
| from llama_cpp import Llama |
| print("[BOOT] llama-cpp-python installed β", flush=True) |
|
|
| import base64, os, re, json, io |
| from typing import Generator, Optional |
|
|
| import gradio as gr |
| print(f"[BOOT] gradio {gr.__version__}", flush=True) |
|
|
| import requests, httpx, uvicorn |
| from fastapi import FastAPI, Request |
| from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse |
| from urllib.parse import urlencode |
| import pathlib, secrets |
|
|
| import urllib3 |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
| |
| |
| |
| REPO_ID = "FINAL-Bench/Darwin-35B-A3B-Opus-Q8-GGUF" |
| GGUF_FILE = "merged_109838c2-q8_0-00001-of-00003.gguf" |
| MODEL_NAME = "Darwin-35B-A3B-Opus-Q8" |
| MODEL_CAP = { |
| "arch": "MoE", "active": "3B / 35B total", |
| "ctx": "262K", "thinking": True, "vision": False, |
| "max_tokens": 16384, "temp_max": 1.5, |
| } |
|
|
| PRESETS = { |
| "general": "You are Darwin-35B-A3B-Opus, a highly capable reasoning model created by VIDRAFT via evolutionary merge. Think step by step for complex questions.", |
| "code": "You are an expert software engineer. Write clean, efficient, well-commented code. Explain your approach before writing. Use modern best practices.", |
| "math": "You are a world-class mathematician. Break problems step-by-step. Show full working. Use LaTeX where helpful.", |
| "creative": "You are a brilliant creative writer. Be imaginative, vivid, and engaging. Adapt tone and style to the request.", |
| "translate": "You are a professional translator fluent in 201 languages. Provide accurate, natural-sounding translations with cultural context.", |
| "research": "You are a rigorous research analyst. Provide structured, well-reasoned analysis. Identify assumptions and acknowledge uncertainty.", |
| } |
|
|
| |
| |
| |
| def detect_gpu_layers() -> int: |
| """μ¬μ© κ°λ₯ν VRAMμ λ°λΌ n_gpu_layers μλ κ²°μ """ |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| props = torch.cuda.get_device_properties(0) |
| vram_gb = (getattr(props, 'total_memory', 0) or getattr(props, 'total_mem', 0)) / (1024**3) |
| print(f"[GPU] {torch.cuda.get_device_name(0)} β {vram_gb:.1f} GB VRAM", flush=True) |
| if vram_gb >= 40: |
| return -1 |
| elif vram_gb >= 24: |
| return 28 |
| elif vram_gb >= 16: |
| return 18 |
| else: |
| return 10 |
| else: |
| print("[GPU] No CUDA device found, CPU-only mode", flush=True) |
| return 0 |
| except Exception as e: |
| print(f"[GPU] Detection failed: {e}, using CPU", flush=True) |
| return 0 |
|
|
| N_GPU_LAYERS = int(os.getenv("N_GPU_LAYERS", str(detect_gpu_layers()))) |
| N_CTX = int(os.getenv("N_CTX", "32768")) |
|
|
| print(f"[MODEL] Loading {REPO_ID} ...", flush=True) |
| print(f"[MODEL] n_gpu_layers={N_GPU_LAYERS}, n_ctx={N_CTX}", flush=True) |
|
|
| |
| from huggingface_hub import hf_hub_download |
|
|
| GGUF_SHARDS = [ |
| "merged_109838c2-q8_0-00001-of-00003.gguf", |
| "merged_109838c2-q8_0-00002-of-00003.gguf", |
| "merged_109838c2-q8_0-00003-of-00003.gguf", |
| ] |
|
|
| shard_paths = [] |
| for shard in GGUF_SHARDS: |
| print(f"[MODEL] Downloading {shard} ...", flush=True) |
| p = hf_hub_download(repo_id=REPO_ID, filename=shard) |
| shard_paths.append(p) |
| print(f"[MODEL] β {p}", flush=True) |
|
|
| |
| llm = Llama( |
| model_path=shard_paths[0], |
| n_gpu_layers=N_GPU_LAYERS, |
| n_ctx=N_CTX, |
| verbose=True, |
| ) |
| print(f"[MODEL] {MODEL_NAME} loaded β", flush=True) |
|
|
| |
| |
| |
| def parse_think_blocks(text: str) -> tuple[str, str]: |
| m = re.search(r"<think>(.*?)</think>\s*", text, re.DOTALL) |
| return (m.group(1).strip(), text[m.end():].strip()) if m else ("", text) |
|
|
| def _is_thinking_line(line: str) -> bool: |
| l = line.strip() |
| if not l: |
| return True |
| think_starts = [ |
| "The user", "the user", "This is", "this is", "I should", "I need to", |
| "Let me", "let me", "My task", "my task", "I'll ", "I will", |
| "Since ", "since ", "Now,", "now,", "So,", "so,", "First,", "first,", |
| "Okay", "okay", "Alright", "Hmm", "Wait", "Actually", |
| "The question", "the question", "The input", "the input", |
| "The request", "the request", "The prompt", "the prompt", |
| "Thinking Process", "Thinking process", "**Thinking", |
| "Step ", "step ", "Approach:", "Analysis:", "Reasoning:", |
| "1. **", "2. **", "3. **", "4. **", "5. **", |
| ] |
| for s in think_starts: |
| if l.startswith(s): |
| return True |
| if l.startswith(("- ", "* ", "β ")) and any(c.isascii() and c.isalpha() for c in l[:20]): |
| if not any(ord(c) > 0x1100 for c in l[:30]): |
| return True |
| return False |
|
|
| def _split_thinking_answer(raw: str) -> tuple: |
| lines = raw.split("\n") |
| answer_start = -1 |
| for i, line in enumerate(lines): |
| if not _is_thinking_line(line): |
| if any(ord(c) > 0x1100 for c in line.strip()[:10]): |
| answer_start = i |
| break |
| if i > 2 and not _is_thinking_line(line): |
| if all(not lines[j].strip() for j in range(max(0,i-2), i)): |
| answer_start = i |
| break |
| if answer_start > 0: |
| return "\n".join(lines[:answer_start]).strip(), "\n".join(lines[answer_start:]).strip() |
| return "", raw |
|
|
| def format_response(raw: str) -> str: |
| chain, answer = parse_think_blocks(raw) |
| if chain: |
| return ( |
| "<details>\n<summary>π§ Reasoning Chain β click to expand</summary>\n\n" |
| f"{chain}\n\n</details>\n\n{answer}" |
| ) |
| if "<think>" in raw and "</think>" not in raw: |
| think_len = len(raw) - raw.index("<think>") - 7 |
| return f"π§ Reasoning... ({think_len} chars)" |
| first_line = raw.strip().split("\n")[0] if raw.strip() else "" |
| if _is_thinking_line(first_line) and len(raw) > 20: |
| thinking, answer = _split_thinking_answer(raw) |
| if thinking and answer: |
| return ( |
| f"<details>\n<summary>π§ Reasoning Chain ({len(thinking)} chars)</summary>\n\n" |
| f"{thinking}\n\n</details>\n\n{answer}" |
| ) |
| elif thinking and not answer: |
| return f"π§ Reasoning... ({len(raw)} chars)" |
| return raw |
|
|
| |
| |
| |
| def generate_reply( |
| message: str, |
| history: list, |
| thinking_mode: str, |
| image_input, |
| system_prompt: str, |
| max_new_tokens: int, |
| temperature: float, |
| top_p: float, |
| ) -> Generator[str, None, None]: |
|
|
| max_new_tokens = min(int(max_new_tokens), MODEL_CAP["max_tokens"]) |
| temperature = min(float(temperature), MODEL_CAP["temp_max"]) |
|
|
| |
| messages: list[dict] = [] |
| if system_prompt.strip(): |
| messages.append({"role": "system", "content": system_prompt.strip()}) |
|
|
| for turn in history: |
| if isinstance(turn, dict): |
| role = turn.get("role", "") |
| raw = turn.get("content") or "" |
| text = (" ".join(p.get("text","") for p in raw |
| if isinstance(p,dict) and p.get("type")=="text") |
| if isinstance(raw, list) else str(raw)) |
| if role == "user": |
| messages.append({"role":"user","content":text}) |
| elif role == "assistant": |
| _, clean = parse_think_blocks(text) |
| messages.append({"role":"assistant","content":clean}) |
| else: |
| try: |
| u, a = (turn[0] or None), (turn[1] if len(turn)>1 else None) |
| except (IndexError, TypeError): |
| continue |
| def _txt(v): |
| if v is None: return None |
| if isinstance(v, list): |
| return " ".join(p.get("text","") for p in v |
| if isinstance(p,dict) and p.get("type")=="text") |
| return str(v) |
| ut = _txt(u) |
| at = _txt(a) |
| if ut: messages.append({"role":"user","content":ut}) |
| if at: |
| _, clean = parse_think_blocks(at) |
| messages.append({"role":"assistant","content":clean}) |
|
|
| |
| messages.append({"role": "user", "content": message}) |
|
|
| print(f"[GEN] msgs={len(messages)}, max_new={max_new_tokens}, temp={temperature}", flush=True) |
|
|
| |
| try: |
| stream = llm.create_chat_completion( |
| messages=messages, |
| max_tokens=max_new_tokens, |
| temperature=max(temperature, 0.01) if temperature > 0.01 else 0.0, |
| top_p=float(top_p), |
| stream=True, |
| ) |
|
|
| raw = "" |
| for chunk in stream: |
| delta = chunk.get("choices", [{}])[0].get("delta", {}) |
| token = delta.get("content", "") |
| if token: |
| raw += token |
| yield format_response(raw) |
|
|
| if raw: |
| print(f"[GEN] Done β {len(raw)} chars", flush=True) |
| yield format_response(raw) |
| else: |
| yield "**β οΈ λͺ¨λΈμ΄ λΉ μλ΅μ λ°ννμ΅λλ€.** λ€μ μλν΄ μ£ΌμΈμ." |
|
|
| except Exception as e: |
| print(f"[GEN] Error: {e}", flush=True) |
| yield f"**β Generation error:** `{e}`" |
|
|
|
|
| |
| |
| |
| with gr.Blocks(title=MODEL_NAME) as gradio_demo: |
| thinking_toggle = gr.Radio( |
| choices=["β‘ Fast Mode (direct answer)", |
| "π§ Thinking Mode (chain-of-thought reasoning)"], |
| value="β‘ Fast Mode (direct answer)", |
| visible=False, |
| ) |
| image_input = gr.Textbox(value="", visible=False) |
| system_prompt = gr.Textbox(value=PRESETS["general"], visible=False) |
| max_new_tokens = gr.Slider(minimum=64, maximum=16384, value=4096, visible=False) |
| temperature = gr.Slider(minimum=0.0, maximum=1.5, value=0.6, visible=False) |
| top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, visible=False) |
|
|
| gr.ChatInterface( |
| fn=generate_reply, |
| api_name="chat", |
| additional_inputs=[ |
| thinking_toggle, image_input, |
| system_prompt, max_new_tokens, temperature, top_p, |
| ], |
| ) |
|
|
| |
| |
| |
| fapp = FastAPI() |
| SESSIONS: dict[str, dict] = {} |
| HTML = pathlib.Path(__file__).parent / "index.html" |
|
|
| CLIENT_ID = os.getenv("OAUTH_CLIENT_ID", "") |
| CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET", "") |
| SPACE_HOST = os.getenv("SPACE_HOST", "localhost:7860") |
| REDIRECT_URI = f"https://{SPACE_HOST}/login/callback" |
|
|
| print(f"[OAuth] CLIENT_ID set: {bool(CLIENT_ID)}") |
| print(f"[OAuth] SPACE_HOST: {SPACE_HOST}") |
| HF_AUTH_URL = "https://huggingface.co/oauth/authorize" |
| HF_TOKEN_URL = "https://huggingface.co/oauth/token" |
| HF_USER_URL = "https://huggingface.co/oauth/userinfo" |
| SCOPES = os.getenv("OAUTH_SCOPES", "openid profile") |
|
|
| def _sid(req: Request) -> Optional[str]: |
| return req.cookies.get("mc_session") |
| def _user(req: Request) -> Optional[dict]: |
| sid = _sid(req) |
| return SESSIONS.get(sid) if sid else None |
|
|
| @fapp.get("/") |
| async def root(request: Request): |
| html = HTML.read_text(encoding="utf-8") if HTML.exists() else "<h2>index.html missing</h2>" |
| return HTMLResponse(html) |
|
|
| @fapp.get("/oauth/user") |
| async def oauth_user(request: Request): |
| u = _user(request) |
| return JSONResponse(u) if u else JSONResponse({"logged_in": False}, status_code=401) |
|
|
| @fapp.get("/oauth/login") |
| async def oauth_login(request: Request): |
| if not CLIENT_ID: |
| return RedirectResponse("/?oauth_error=not_configured") |
| state = secrets.token_urlsafe(16) |
| params = {"response_type":"code","client_id":CLIENT_ID,"redirect_uri":REDIRECT_URI,"scope":SCOPES,"state":state} |
| return RedirectResponse(f"{HF_AUTH_URL}?{urlencode(params)}", status_code=302) |
|
|
| @fapp.get("/login/callback") |
| async def oauth_callback(code: str = "", error: str = "", state: str = ""): |
| if error or not code: |
| return RedirectResponse("/?auth_error=1") |
| basic = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() |
| async with httpx.AsyncClient() as client: |
| tok = await client.post(HF_TOKEN_URL, data={"grant_type":"authorization_code","code":code,"redirect_uri":REDIRECT_URI}, |
| headers={"Accept":"application/json","Authorization":f"Basic {basic}"}) |
| if tok.status_code != 200: |
| return RedirectResponse("/?auth_error=1") |
| access_token = tok.json().get("access_token", "") |
| if not access_token: |
| return RedirectResponse("/?auth_error=1") |
| uinfo = await client.get(HF_USER_URL, headers={"Authorization":f"Bearer {access_token}"}) |
| if uinfo.status_code != 200: |
| return RedirectResponse("/?auth_error=1") |
| user = uinfo.json() |
| sid = secrets.token_urlsafe(32) |
| SESSIONS[sid] = { |
| "logged_in": True, |
| "username": user.get("preferred_username", user.get("name", "User")), |
| "name": user.get("name", ""), |
| "avatar": user.get("picture", ""), |
| "profile": f"https://huggingface.co/{user.get('preferred_username', '')}", |
| } |
| resp = RedirectResponse("/") |
| resp.set_cookie("mc_session", sid, httponly=True, samesite="lax", secure=True, max_age=60*60*24*7) |
| return resp |
|
|
| @fapp.get("/oauth/logout") |
| async def oauth_logout(request: Request): |
| sid = _sid(request) |
| if sid and sid in SESSIONS: del SESSIONS[sid] |
| resp = RedirectResponse("/") |
| resp.delete_cookie("mc_session") |
| return resp |
|
|
| @fapp.get("/health") |
| async def health(): |
| return {"status": "ok", "model": MODEL_NAME, "gpu_layers": N_GPU_LAYERS, "ctx": N_CTX} |
|
|
| |
| BRAVE_API_KEY = os.getenv("BRAVE_API_KEY", "") |
|
|
| @fapp.post("/api/search") |
| async def api_search(request: Request): |
| body = await request.json() |
| query = body.get("query", "").strip() |
| if not query: |
| return JSONResponse({"error": "empty query"}, status_code=400) |
| key = BRAVE_API_KEY |
| if not key: |
| return JSONResponse({"error": "BRAVE_API_KEY not set"}, status_code=500) |
| try: |
| r = requests.get( |
| "https://api.search.brave.com/res/v1/web/search", |
| headers={"X-Subscription-Token": key, "Accept": "application/json"}, |
| params={"q": query, "count": 5}, timeout=10, |
| ) |
| r.raise_for_status() |
| results = r.json().get("web", {}).get("results", []) |
| items = [{"title": item.get("title",""), "desc": item.get("description",""), "url": item.get("url","")} for item in results[:5]] |
| return JSONResponse({"results": items}) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| |
| @fapp.post("/api/extract-pdf") |
| async def api_extract_pdf(request: Request): |
| try: |
| body = await request.json() |
| b64 = body.get("data", "") |
| if "," in b64: |
| b64 = b64.split(",", 1)[1] |
| pdf_bytes = base64.b64decode(b64) |
| text = "" |
| try: |
| import fitz |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") |
| for page in doc: |
| text += page.get_text() + "\n" |
| except ImportError: |
| content = pdf_bytes.decode("utf-8", errors="ignore") |
| text = re.sub(r'[^\x20-\x7E\n\r\uAC00-\uD7A3\u3040-\u309F\u30A0-\u30FF]', '', content) |
| text = text.strip()[:8000] |
| return JSONResponse({"text": text, "chars": len(text)}) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| |
| |
| |
| app = gr.mount_gradio_app(fapp, gradio_demo, path="/gradio") |
|
|
| if __name__ == "__main__": |
| print(f"[BOOT] {MODEL_NAME} Β· llama-cpp Β· GPU layers: {N_GPU_LAYERS}", flush=True) |
| uvicorn.run(app, host="0.0.0.0", port=7860) |