|
|
|
|
|
import os |
|
|
|
|
|
os.environ["OMP_NUM_THREADS"] = "1" |
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
from fastapi import FastAPI, File, UploadFile |
|
|
from fastapi.responses import JSONResponse |
|
|
from PIL import Image, ImageOps |
|
|
import io, time, torch |
|
|
import numpy as np |
|
|
|
|
|
from transformers import AutoProcessor, AutoModel |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_ID = "google/siglip-so400m-patch14-384" |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
|
|
|
try: |
|
|
import torchvision |
|
|
USE_FAST = True |
|
|
except Exception: |
|
|
USE_FAST = False |
|
|
|
|
|
print(f"🚀 Carregando modelo {MODEL_ID} (use_fast={USE_FAST})...") |
|
|
|
|
|
processor = AutoProcessor.from_pretrained(MODEL_ID, use_fast=USE_FAST) |
|
|
model = AutoModel.from_pretrained( |
|
|
MODEL_ID, |
|
|
dtype=torch.float16 if device == "cuda" else None |
|
|
).to(device).eval() |
|
|
print("✅ Modelo carregado com sucesso.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
labels = { |
|
|
"plastico": [ |
|
|
"plástico", "garrafa PET", "tampinha plástica", |
|
|
"sacola plástica", "plastic bottle" |
|
|
], |
|
|
"papel": [ |
|
|
"papel", "folha", "envelope de papel", |
|
|
"paper sheet", "paper wrapper" |
|
|
], |
|
|
"metal": [ |
|
|
"lata", "alumínio", "tampinha metálica", |
|
|
"metal cap", "can" |
|
|
], |
|
|
} |
|
|
|
|
|
def _promptize(term: str) -> str: |
|
|
return f"centered {term} on a white background; ignore the background; classify only the object" |
|
|
|
|
|
texts = [_promptize(t) for group in labels.values() for t in group] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def crop_foreground_ignore_white(pil: Image.Image) -> Image.Image: |
|
|
img = pil.convert("RGB") |
|
|
arr = np.array(img) |
|
|
r, g, b = arr[..., 0], arr[..., 1], arr[..., 2] |
|
|
whiteish = (r > 230) & (g > 230) & (b > 230) |
|
|
fg = ~whiteish |
|
|
|
|
|
if fg.sum() < 500: |
|
|
w, h = img.size |
|
|
cw, ch = int(w * 0.8), int(h * 0.8) |
|
|
left, top = (w - cw) // 2, (h - ch) // 2 |
|
|
return img.crop((left, top, left + cw, top + ch)) |
|
|
|
|
|
ys, xs = np.where(fg) |
|
|
y0, y1 = ys.min(), ys.max() |
|
|
x0, x1 = xs.min(), xs.max() |
|
|
|
|
|
py, px = int(0.03 * img.height), int(0.03 * img.width) |
|
|
y0 = max(0, y0 - py); y1 = min(img.height - 1, y1 + py) |
|
|
x0 = max(0, x0 - px); x1 = min(img.width - 1, x1 + px) |
|
|
|
|
|
return img.crop((x0, y0, x1 + 1, y1 + 1)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(title="TrashTrack Turbo — ESP32 Compatible") |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return {"ok": True, "model": MODEL_ID, "mode": "multipart/files[]", "classes": list(labels.keys())} |
|
|
|
|
|
@app.post("/predict") |
|
|
async def predict(files_: list[UploadFile] = File(..., alias="files[]")): |
|
|
try: |
|
|
t0 = time.time() |
|
|
results = [] |
|
|
|
|
|
for f in files_: |
|
|
data = await f.read() |
|
|
image = Image.open(io.BytesIO(data)) |
|
|
image = ImageOps.exif_transpose(image).convert("RGB") |
|
|
image = crop_foreground_ignore_white(image) |
|
|
|
|
|
text_inputs = processor(text=texts, return_tensors="pt", padding=True).to(device) |
|
|
image_inputs = processor(images=image, return_tensors="pt").to(device) |
|
|
|
|
|
with torch.inference_mode(): |
|
|
txt_emb, img_emb = None, None |
|
|
|
|
|
if hasattr(model, "get_text_features"): |
|
|
txt_emb = model.get_text_features(**text_inputs) |
|
|
if hasattr(model, "get_image_features"): |
|
|
img_emb = model.get_image_features(**image_inputs) |
|
|
|
|
|
if txt_emb is None or img_emb is None: |
|
|
joint = processor(text=texts, images=image, return_tensors="pt", padding=True).to(device) |
|
|
out = model(**joint) |
|
|
if txt_emb is None: |
|
|
txt_emb = getattr(out, "text_embeds", getattr(out, "text_embeds_projected", None)) |
|
|
if img_emb is None: |
|
|
img_emb = getattr(out, "image_embeds", getattr(out, "image_embeds_projected", None)) |
|
|
|
|
|
img_emb = torch.nn.functional.normalize(img_emb, dim=-1) |
|
|
txt_emb = torch.nn.functional.normalize(txt_emb, dim=-1) |
|
|
logits = (img_emb @ txt_emb.t()).squeeze(0) |
|
|
probs = torch.softmax(logits.float().cpu(), dim=-1).tolist() |
|
|
|
|
|
idx, scores = 0, {} |
|
|
for key, group in labels.items(): |
|
|
g = probs[idx: idx + len(group)] |
|
|
s = sum(g) / len(g) |
|
|
scores[key] = s |
|
|
idx += len(group) |
|
|
|
|
|
best = max(scores, key=scores.get) |
|
|
conf = round(float(scores[best]), 3) |
|
|
results.append((best, conf)) |
|
|
|
|
|
votos = {} |
|
|
for r, c in results: |
|
|
votos.setdefault(r, []).append(c) |
|
|
final = max(votos, key=lambda k: sum(votos[k]) / len(votos[k])) |
|
|
conf = round(sum(votos[final]) / len(votos[final]), 3) |
|
|
latency = round(time.time() - t0, 2) |
|
|
|
|
|
print(f"[OK] {final} ({conf}) em {latency}s") |
|
|
return JSONResponse({"label": final, "conf": conf, "latency_s": latency}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERRO] {e}") |
|
|
return JSONResponse({"error": str(e)}, status_code=500) |