import os import json import requests import shutil import logging import cloudinary import cloudinary.uploader from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from fastapi import FastAPI, Body, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from dotenv import load_dotenv from pydantic import BaseModel, HttpUrl, validator from typing import List, Optional from pipeline import run_intervision_pipeline # ── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="[%(levelname)s] %(asctime)s - %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger(__name__) # ── Retry strategy ──────────────────────────────────────────────────────────── retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) http = requests.Session() http.mount("https://", adapter) http.mount("http://", adapter) load_dotenv() app = FastAPI(title="Intervision AI Engine") # ── Cloudinary ──────────────────────────────────────────────────────────────── _cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME") _cloud_key = os.getenv("CLOUDINARY_API_KEY") _cloud_secret = os.getenv("CLOUDINARY_API_SECRET") if not all([_cloud_name, _cloud_key, _cloud_secret]): log.warning( "Cloudinary credentials missing – video upload will be skipped. " "Set CLOUDINARY_CLOUD_NAME, CLOUDINARY_API_KEY, CLOUDINARY_API_SECRET." ) cloudinary.config( cloud_name=_cloud_name, api_key=_cloud_key, api_secret=_cloud_secret, ) # ── Directories ─────────────────────────────────────────────────────────────── RESULT_DIR = "temp_data/results" UPLOAD_DIR = "temp_data/uploads" os.makedirs(RESULT_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True) # ── Pydantic models ─────────────────────────────────────────────────────────── class Answer(BaseModel): aiQuestionId: int questionText: str expectedAnswer: str isAnswered: bool isSkipped: bool isFailed: bool startedAt: str submittedAt: str @validator("startedAt", "submittedAt") def validate_timestamp(cls, v): if not v: return "00:00:00" parts = v.split(":") if len(parts) != 3: raise ValueError(f"Timestamp must be HH:MM:SS, got: {v!r}") try: list(map(int, parts)) except ValueError: raise ValueError(f"Timestamp parts must be integers, got: {v!r}") return v class InterviewRequest(BaseModel): sessionId: str originalVideoUrl: HttpUrl callbackBaseUrl: HttpUrl answers: List[Answer] @validator("answers") def answers_not_empty(cls, v): if not v: raise ValueError("answers list must not be empty") return v class DeleteVideoRequest(BaseModel): videoUrl: str # ── Helpers ─────────────────────────────────────────────────────────────────── def time_to_seconds(t_str: str) -> int: """Convert HH:MM:SS to total seconds. Returns 0 on any error.""" if not t_str: return 0 try: parts = t_str.split(":") if len(parts) != 3: log.warning("time_to_seconds: unexpected format %r – defaulting to 0", t_str) return 0 h, m, s = map(int, parts) total = h * 3600 + m * 60 + s if total < 0: log.warning("time_to_seconds: negative value %d – clamping to 0", total) return 0 return total except Exception as e: log.warning("time_to_seconds(%r) error: %s – defaulting to 0", t_str, e) return 0 def _send_callback(callback_url: str, payload: dict, session_id: str): """POST the final payload to the callback endpoint. Logs but never raises.""" try: endpoint = str(callback_url).rstrip("/") resp = requests.post(endpoint, json=payload, timeout=30) resp.raise_for_status() log.info("[CALLBACK] %s → HTTP %d", endpoint, resp.status_code) except requests.exceptions.Timeout: log.error("[CALLBACK] Timeout sending results for session %s", session_id) except requests.exceptions.ConnectionError as e: log.error("[CALLBACK] Connection error for session %s: %s", session_id, e) except requests.exceptions.HTTPError as e: log.error("[CALLBACK] Server returned error for session %s: %s", session_id, e) except Exception as e: log.error("[CALLBACK] Unexpected error for session %s: %s", session_id, e) def _cleanup(paths: list): """Remove files/directories silently.""" for path in paths: try: if os.path.isfile(path): os.remove(path) log.info("[CLEANUP] Removed file: %s", path) elif os.path.isdir(path): shutil.rmtree(path) log.info("[CLEANUP] Removed dir: %s", path) except Exception as e: log.warning("[CLEANUP] Could not remove %s: %s", path, e) # ── Background job ──────────────────────────────────────────────────────────── def background_processing(session_data: dict): session_id = session_data.get("sessionId", "unknown") video_url = session_data.get("originalVideoUrl") callback_url = session_data.get("callbackBaseUrl") log.info("[JOB] Started – session: %s", session_id) session_dir = os.path.join(RESULT_DIR, session_id) local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4") os.makedirs(session_dir, exist_ok=True) # ── 1. Validate inputs ──────────────────────────────────────────────────── if not video_url: log.error("[JOB] No video URL for session %s", session_id) _send_callback(callback_url, { "sessionId": session_id, "finalVideoUrl": None, "report": [], "error": "Missing video URL", }, session_id) return if not callback_url: log.error("[JOB] No callback URL for session %s", session_id) return # ── 2. Download video ───────────────────────────────────────────────────── try: log.info("[JOB] Downloading video: %s", video_url) response = http.get(str(video_url), stream=True, timeout=300) response.raise_for_status() total_bytes = 0 with open(local_input_path, "wb") as f: for chunk in response.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) total_bytes += len(chunk) if total_bytes == 0 or not os.path.exists(local_input_path): raise ValueError("Downloaded file is empty or missing") log.info("[JOB] Downloaded %.2f MB", total_bytes / (1024 * 1024)) except requests.exceptions.Timeout: log.error("[JOB] Download timed out for session %s", session_id) _send_callback(callback_url, { "sessionId": session_id, "finalVideoUrl": None, "report": [], "error": "Video download timed out", }, session_id) _cleanup([local_input_path, session_dir]) return except requests.exceptions.HTTPError as e: log.error("[JOB] HTTP error downloading video: %s", e) _send_callback(callback_url, { "sessionId": session_id, "finalVideoUrl": None, "report": [], "error": f"Video download HTTP error: {e}", }, session_id) _cleanup([local_input_path, session_dir]) return except Exception as e: log.error("[JOB] Download failed: %s", e) _send_callback(callback_url, { "sessionId": session_id, "finalVideoUrl": None, "report": [], "error": f"Video download failed: {e}", }, session_id) _cleanup([local_input_path, session_dir]) return # ── 3. Parse & validate question timings ────────────────────────────────── final_questions = [] skipped_failed_reports = [] for q in session_data.get("answers", []): qid = q.get("aiQuestionId") if q.get("isAnswered"): start_s = time_to_seconds(q.get("startedAt", "00:00:00")) end_s = time_to_seconds(q.get("submittedAt", "00:00:00")) if end_s <= start_s: log.warning( "[JOB] Q%s: end_time (%d) <= start_time (%d) – treating as unanswered", qid, end_s, start_s, ) skipped_failed_reports.append({ "questionId": qid, "userAnswerText": "N/A", "score": 0.0, "relevance": 0.0, "confidence": 0.0, "stress": 0.0, "clarity": 0.0, "pauses": 0.0, "toneOfVoice": 3, "status": "invalid_timing", }) continue question_text = (q.get("questionText") or "").strip() expected_answer = (q.get("expectedAnswer") or "").strip() if not question_text: log.warning("[JOB] Q%s: empty questionText", qid) final_questions.append({ "question_id": qid, "question_text": question_text, "ideal_answer": expected_answer, "start_time": start_s, "end_time": end_s, }) else: status = "skipped" if q.get("isSkipped") else "failed" skipped_failed_reports.append({ "questionId": qid, "userAnswerText": "N/A", "score": 0.0, "relevance": 0.0, "confidence": 0.0, "stress": 0.0, "clarity": 0.0, "pauses": 0.0, "toneOfVoice": 3, "status": status, }) # ── 4. Run pipeline ─────────────────────────────────────────────────────── ai_results = [] final_video_url = None if final_questions: log.info("[JOB] Running pipeline for %d questions …", len(final_questions)) try: final_video_path, report_path = run_intervision_pipeline( local_input_path, final_questions, session_dir ) # Parse report if report_path and os.path.exists(report_path): try: with open(report_path, "r") as f: ai_results = json.load(f).get("listOfAnswerReport", []) except (json.JSONDecodeError, KeyError) as e: log.error("[JOB] Failed to parse report.json: %s", e) ai_results = [] else: log.warning("[JOB] report.json not found – using empty results") # Upload annotated video if final_video_path and os.path.exists(final_video_path): if os.path.getsize(final_video_path) == 0: log.warning("[JOB] Annotated video is 0 bytes – skipping upload") elif not all([_cloud_name, _cloud_key, _cloud_secret]): log.warning("[JOB] Cloudinary not configured – skipping upload") else: try: upload_res = cloudinary.uploader.upload( final_video_path, public_id=f"res_{session_id}", folder="intervision_results", resource_type="video", ) final_video_url = upload_res.get("secure_url") if not final_video_url: log.warning("[JOB] Cloudinary returned no secure_url") else: log.info("[JOB] Video uploaded: %s", final_video_url) except cloudinary.exceptions.Error as e: log.error("[JOB] Cloudinary upload error: %s", e) except Exception as e: log.error("[JOB] Unexpected upload error: %s", e) else: log.warning("[JOB] No final video to upload") except Exception as e: log.error("[JOB] Pipeline crashed for session %s: %s", session_id, e) # ai_results stays [] – callback will still fire with skipped + failed else: log.info("[JOB] No answered questions – skipping pipeline") # ── 5. Callback ─────────────────────────────────────────────────────────── final_payload = { "sessionId": session_id, "finalVideoUrl": final_video_url, "report": ai_results + skipped_failed_reports, } _send_callback(callback_url, final_payload, session_id) # ── 6. Cleanup ──────────────────────────────────────────────────────────── _cleanup([local_input_path, session_dir]) log.info("[JOB] Done – session: %s", session_id) # ══════════════════════════════════════════════════════════════════════════════ # ROUTES # ══════════════════════════════════════════════════════════════════════════════ @app.get("/") async def root(): return { "status": "Intervision AI Engine Running", "message": "API is working successfully", } @app.post("/process-interview") async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest): """ Accepts an interview session and queues it for background processing. Responds immediately; results are sent to callbackBaseUrl when ready. """ log.info("[API] /process-interview – session: %s", data.sessionId) background_tasks.add_task(background_processing, data.dict()) return { "message": "Processing started", "sessionId": data.sessionId, } @app.post("/delete-video-by-url") async def delete_video_by_url(data: DeleteVideoRequest): """Delete a Cloudinary video by its public URL.""" video_url = (data.videoUrl or "").strip() if not video_url: raise HTTPException(status_code=400, detail="videoUrl is required") if not all([_cloud_name, _cloud_key, _cloud_secret]): raise HTTPException( status_code=503, detail="Cloudinary is not configured on this server", ) try: url_parts = video_url.split("/") filename_with_ext = url_parts[-1] if "." not in filename_with_ext: raise ValueError(f"Cannot parse filename from URL: {video_url!r}") filename = filename_with_ext.rsplit(".", 1)[0] folder = url_parts[-2] if len(url_parts) >= 2 and "intervision_results" in url_parts[-2] else "" public_id = f"{folder}/{filename}" if folder else filename log.info("[DELETE] Deleting public_id: %s", public_id) result = cloudinary.uploader.destroy(public_id, resource_type="video") if result.get("result") == "ok": return {"status": "success", "message": f"Deleted {public_id}"} # "not found" is still a valid non-error state if result.get("result") == "not found": log.warning("[DELETE] public_id not found on Cloudinary: %s", public_id) return JSONResponse( status_code=404, content={"status": "not_found", "message": f"{public_id} was not found"}, ) log.warning("[DELETE] Unexpected Cloudinary result: %s", result) return JSONResponse(status_code=500, content={"status": "failed", "details": result}) except cloudinary.exceptions.Error as e: log.error("[DELETE] Cloudinary error: %s", e) raise HTTPException(status_code=502, detail=f"Cloudinary error: {e}") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: log.error("[DELETE] Unexpected error: %s", e) raise HTTPException(status_code=500, detail=str(e)) # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)