import os, io, base64, urllib.request, ssl, time, json, pathlib from typing import Optional, List import numpy as np, cv2 from ultralytics import YOLO import easyocr from PIL import Image import pillow_heif from fastapi import FastAPI, HTTPException, File, UploadFile, Form from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field os.environ.setdefault("YOLO_CONFIG_DIR", "/tmp/Ultralytics") os.environ.setdefault("HF_HOME", "/tmp/.cache/huggingface") os.makedirs(os.environ["YOLO_CONFIG_DIR"], exist_ok=True) os.environ.setdefault("OMP_NUM_THREADS", "2") os.environ.setdefault("OPENBLAS_NUM_THREADS", "2") os.environ.setdefault("MKL_NUM_THREADS", "2") import torch torch.set_num_threads(2) TMP_DIR = "/tmp" paths = [ f"{TMP_DIR}/Ultralytics", f"{TMP_DIR}/.EasyOCR", f"{TMP_DIR}/.EasyOCR/user_network", f"{TMP_DIR}/mplconfig", ] for p in paths: os.makedirs(p, exist_ok=True) from huggingface_hub import hf_hub_download # --- PESOS COMPATIBLES ULTRALYTICS (YOLOv11) --- REPO_ID = "morsetechlab/yolov11-license-plate-detection" FILENAME = "license-plate-finetune-v1n.pt" # o v1s/v1m/v1l/v1x WEIGHTS = hf_hub_download(repo_id=REPO_ID, filename=FILENAME) yolo = YOLO(WEIGHTS) # EasyOCR con GPU si está disponible reader = easyocr.Reader( ['en'], gpu=torch.cuda.is_available(), model_storage_directory="/tmp/.EasyOCR", ) ALLOW = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" def clamp(v, lo, hi): return max(lo, min(hi, v)) def expand_box(xyxy, w, h, pad_ratio=0.10): x1, y1, x2, y2 = [int(v) for v in xyxy] bw, bh = x2 - x1, y2 - y1 px, py = int(bw * pad_ratio), int(bh * pad_ratio) nx1 = clamp(x1 - px, 0, w - 1) ny1 = clamp(y1 - py, 0, h - 1) nx2 = clamp(x2 + px, 0, w - 1) ny2 = clamp(y2 + py, 0, h - 1) return nx1, ny1, nx2, ny2 def ensure_min_size(img_bgr, target_long=320): h, w = img_bgr.shape[:2] m = max(h, w) if m < target_long: scale = target_long / float(m) nh, nw = int(round(h * scale)), int(round(w * scale)) img_bgr = cv2.resize(img_bgr, (nw, nh), interpolation=cv2.INTER_CUBIC) return img_bgr def preproc_adaptive(plate_bgr): img = ensure_min_size(plate_bgr) # asegura tamaño gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) gray = cv2.bilateralFilter(gray, 7, 50, 50) th = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 5 ) # opcional: cerrar huecos finos k = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, k, iterations=1) return th def preproc_clahe_otsu(plate_bgr): img = ensure_min_size(plate_bgr) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) eq = clahe.apply(gray) _, th = cv2.threshold(eq, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) k = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, k, iterations=1) return th def read_easy(img, allow=ALLOW): out = reader.readtext(img, detail=1, allowlist=allow) cands = [] for _, text, score in out: t = "".join(c for c in (text or "").upper() if c in allow) if len(t) >= 4: cands.append((t, float(score))) if not cands: return "", 0.0 cands.sort(key=lambda x: (x[1], len(x[0])), reverse=True) return cands[0] def preprocess_for_ocr(plate_bgr): img = plate_bgr.copy() h, w = img.shape[:2] if max(h, w) < 160: img = cv2.resize(img, (w*2, h*2), interpolation=cv2.INTER_CUBIC) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) gray = cv2.bilateralFilter(gray, 7, 50, 50) th = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 5) return th def ocr_plate(plate_bgr): # 1) adaptativa t, s = read_easy(preproc_adaptive(plate_bgr)) if t: return t, s # 2) CLAHE + Otsu (fallback) return read_easy(preproc_clahe_otsu(plate_bgr)) def draw_box_text(img, xyxy, text, color=(0, 255, 0)): x1, y1, x2, y2 = [int(v) for v in xyxy] cv2.rectangle(img, (x1,y1), (x2,y2), color, 2) if text: tsize = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] cv2.rectangle(img, (x1, y1 - tsize[1] - 6), (x1 + tsize[0] + 4, y1), color, -1) cv2.putText(img, text, (x1 + 2, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2, cv2.LINE_AA) def detect_plates_bgr(bgr, conf=0.25, iou=0.45): # 512 es buen sweet spot en CPU res = yolo.predict(bgr, conf=conf, iou=iou, imgsz=512, max_det=1, verbose=False)[0] boxes = res.boxes.xyxy.cpu().numpy() if res.boxes is not None else np.empty((0,4)) confs = res.boxes.conf.cpu().numpy() if res.boxes is not None else np.empty((0,)) return boxes, confs def run_on_image_bgr(bgr, conf=0.25, iou=0.45, with_ocr=True, annotate=True, max_plates=1): h, w = bgr.shape[:2] vis = bgr.copy() t0 = time.time() boxes, confs = detect_plates_bgr(bgr, conf, iou) idx = np.argsort(-confs)[:max_plates] boxes, confs = boxes[idx], confs[idx] detections = [] for xyxy, c in zip(boxes, confs): x1, y1, x2, y2 = expand_box(xyxy, w, h, pad_ratio=0.10) crop = bgr[y1:y2, x1:x2] txt, s = ("", 0.0) # 👇 no gastes OCR si la caja es floja if with_ocr and crop.size and float(c) >= 0.55: txt, s = ocr_plate(crop) if annotate: label = f"{txt or 'plate'} {c:.2f}" draw_box_text(vis, (x1, y1, x2, y2), label) detections.append({"box_xyxy":[x1,y1,x2,y2],"det_conf":float(c),"ocr_text":txt,"ocr_conf":float(s)}) dt_ms = int((time.time() - t0) * 1000) return vis, detections, (w, h), dt_ms def bgr_to_jpeg_base64(bgr): ok, buf = cv2.imencode(".jpg", bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) if not ok: return None return base64.b64encode(buf.tobytes()).decode("ascii") def pil_to_bgr(pil_img: Image.Image) -> np.ndarray: if pil_img.mode not in ("RGB", "RGBA"): pil_img = pil_img.convert("RGB") arr = np.array(pil_img) if arr.ndim == 2: arr = np.stack([arr]*3, axis=-1) if arr.shape[2] == 4: arr = arr[:, :, :3] return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) def decode_bytes_to_bgr(data: bytes, content_type: str = "") -> np.ndarray: # 1) OpenCV rápido arr = np.frombuffer(data, np.uint8) bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) if bgr is not None: return bgr # 2) Fallback PIL (con HEIC soportado por pillow-heif) try: with Image.open(io.BytesIO(data)) as im: return pil_to_bgr(im) except Exception as e: raise ValueError(f"No pude decodificar la imagen ({content_type}): {e}") def load_image_from_url(url: str) -> np.ndarray: ssl._create_default_https_context = ssl._create_unverified_context req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=20) as r: data = r.read() return decode_bytes_to_bgr(data, content_type=r.headers.get("Content-Type","")) def load_image_from_b64(b64_or_data_url: str) -> np.ndarray: s = b64_or_data_url if s.startswith("data:"): s = s.split(",", 1)[1] raw = base64.b64decode(s) return decode_bytes_to_bgr(raw, content_type="base64") # --- FastAPI --- app = FastAPI(title="Plates API (HF Space)") ALLOWED = [ "http://localhost:5173", "http://127.0.0.1:5173", "https://www.omar-cruz.com", "https://omar-cruz.com", ] app.add_middleware( CORSMiddleware, allow_origins=ALLOWED, allow_origin_regex=r"^https?://([a-z0-9-]+\.)*hf\.space$", allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) class Detection(BaseModel): box_xyxy: List[int] det_conf: float ocr_text: str = "" ocr_conf: float = 0.0 class DetectResponse(BaseModel): detections: List[Detection] count: int width: int height: int time_ms: int annotated_image_b64: Optional[str] = None class DetectRequest(BaseModel): image_url: Optional[str] = None image_b64: Optional[str] = None conf: float = Field(0.25, ge=0.05, le=0.95) iou: float = Field(0.45, ge=0.1, le=0.9) ocr: bool = True return_image: bool = False @app.get("/") def health(): return { "status": "ok", "service": "plates-api", "model": os.path.basename(WEIGHTS), "ocr_gpu": torch.cuda.is_available(), "allow_origins": ALLOWED, } @app.post("/detect", response_model=DetectResponse) def detect(req: DetectRequest): try: if not req.image_url and not req.image_b64: raise HTTPException(400, "Proporciona 'image_url' o 'image_b64'.") bgr = load_image_from_url(req.image_url) if req.image_url else load_image_from_b64(req.image_b64) vis, dets, (w, h), dt_ms = run_on_image_bgr( bgr, conf=req.conf, iou=req.iou, with_ocr=req.ocr, annotate=req.return_image ) b64 = bgr_to_jpeg_base64(vis) if req.return_image else None return DetectResponse( detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms, annotated_image_b64=b64 ) except HTTPException: raise except Exception as e: raise HTTPException(500, f"Error procesando la imagen: {e}") @app.post("/detect_upload", response_model=DetectResponse) async def detect_upload( image: UploadFile = File(...), conf: float = Form(0.25), iou: float = Form(0.45), ocr: bool = Form(True), return_image: bool = Form(False), ): try: data = await image.read() if not data: raise HTTPException(400, "Archivo vacío.") bgr = decode_bytes_to_bgr(data, content_type=image.content_type or image.filename) if bgr is None: # si llega aquí es que ni cv2 ni PIL pudieron raise HTTPException(415, f"Formato no soportado: {image.content_type or image.filename}") vis, dets, (w, h), dt_ms = run_on_image_bgr( bgr, conf=conf, iou=iou, with_ocr=ocr, annotate=return_image ) b64 = bgr_to_jpeg_base64(vis) if return_image else None return DetectResponse( detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms, annotated_image_b64=b64 ) except HTTPException: raise except Exception as e: raise HTTPException(500, f"Error procesando la imagen: {e}") @app.on_event("startup") def _warmup(): import numpy as np, cv2 dummy = np.zeros((512, 512, 3), dtype=np.uint8) try: _ = yolo.predict(dummy, conf=0.25, iou=0.45, imgsz=512, verbose=False) except Exception as e: print("Warmup YOLO:", e) try: _ = reader.readtext(cv2.cvtColor(dummy, cv2.COLOR_BGR2GRAY), detail=0) except Exception as e: print("Warmup EasyOCR:", e)