| import os |
| import json |
| import requests |
| import shutil |
| import logging |
| import cloudinary |
| import cloudinary.uploader |
| from requests.adapters import HTTPAdapter |
| from urllib3.util.retry import Retry |
| from fastapi import FastAPI, Body, HTTPException, BackgroundTasks |
| from fastapi.responses import JSONResponse |
| from dotenv import load_dotenv |
| from pydantic import BaseModel, HttpUrl, validator |
| from typing import List, Optional |
| from pipeline import run_intervision_pipeline |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="[%(levelname)s] %(asctime)s - %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
| log = logging.getLogger(__name__) |
|
|
| |
| retry_strategy = Retry( |
| total=3, |
| backoff_factor=1, |
| status_forcelist=[429, 500, 502, 503, 504], |
| ) |
| adapter = HTTPAdapter(max_retries=retry_strategy) |
| http = requests.Session() |
| http.mount("https://", adapter) |
| http.mount("http://", adapter) |
|
|
| load_dotenv() |
| app = FastAPI(title="Intervision AI Engine") |
|
|
| |
| _cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME") |
| _cloud_key = os.getenv("CLOUDINARY_API_KEY") |
| _cloud_secret = os.getenv("CLOUDINARY_API_SECRET") |
|
|
| if not all([_cloud_name, _cloud_key, _cloud_secret]): |
| log.warning( |
| "Cloudinary credentials missing β video upload will be skipped. " |
| "Set CLOUDINARY_CLOUD_NAME, CLOUDINARY_API_KEY, CLOUDINARY_API_SECRET." |
| ) |
|
|
| cloudinary.config( |
| cloud_name=_cloud_name, |
| api_key=_cloud_key, |
| api_secret=_cloud_secret, |
| ) |
|
|
| |
| RESULT_DIR = "temp_data/results" |
| UPLOAD_DIR = "temp_data/uploads" |
| os.makedirs(RESULT_DIR, exist_ok=True) |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
|
|
| |
| class Answer(BaseModel): |
| aiQuestionId: int |
| questionText: str |
| expectedAnswer: str |
| isAnswered: bool |
| isSkipped: bool |
| isFailed: bool |
| startedAt: str |
| submittedAt: str |
|
|
| @validator("startedAt", "submittedAt") |
| def validate_timestamp(cls, v): |
| if not v: |
| return "00:00:00" |
| parts = v.split(":") |
| if len(parts) != 3: |
| raise ValueError(f"Timestamp must be HH:MM:SS, got: {v!r}") |
| try: |
| list(map(int, parts)) |
| except ValueError: |
| raise ValueError(f"Timestamp parts must be integers, got: {v!r}") |
| return v |
|
|
|
|
| class InterviewRequest(BaseModel): |
| sessionId: str |
| originalVideoUrl: HttpUrl |
| callbackBaseUrl: HttpUrl |
| answers: List[Answer] |
|
|
| @validator("answers") |
| def answers_not_empty(cls, v): |
| if not v: |
| raise ValueError("answers list must not be empty") |
| return v |
|
|
|
|
| class DeleteVideoRequest(BaseModel): |
| videoUrl: str |
|
|
|
|
| |
| def time_to_seconds(t_str: str) -> int: |
| """Convert HH:MM:SS to total seconds. Returns 0 on any error.""" |
| if not t_str: |
| return 0 |
| try: |
| parts = t_str.split(":") |
| if len(parts) != 3: |
| log.warning("time_to_seconds: unexpected format %r β defaulting to 0", t_str) |
| return 0 |
| h, m, s = map(int, parts) |
| total = h * 3600 + m * 60 + s |
| if total < 0: |
| log.warning("time_to_seconds: negative value %d β clamping to 0", total) |
| return 0 |
| return total |
| except Exception as e: |
| log.warning("time_to_seconds(%r) error: %s β defaulting to 0", t_str, e) |
| return 0 |
|
|
|
|
| def _send_callback(callback_url: str, payload: dict, session_id: str): |
| """POST the final payload to the callback endpoint. Logs but never raises.""" |
| try: |
| endpoint = str(callback_url).rstrip("/") |
| resp = requests.post(endpoint, json=payload, timeout=30) |
| resp.raise_for_status() |
| log.info("[CALLBACK] %s β HTTP %d", endpoint, resp.status_code) |
| except requests.exceptions.Timeout: |
| log.error("[CALLBACK] Timeout sending results for session %s", session_id) |
| except requests.exceptions.ConnectionError as e: |
| log.error("[CALLBACK] Connection error for session %s: %s", session_id, e) |
| except requests.exceptions.HTTPError as e: |
| log.error("[CALLBACK] Server returned error for session %s: %s", session_id, e) |
| except Exception as e: |
| log.error("[CALLBACK] Unexpected error for session %s: %s", session_id, e) |
|
|
|
|
| def _cleanup(paths: list): |
| """Remove files/directories silently.""" |
| for path in paths: |
| try: |
| if os.path.isfile(path): |
| os.remove(path) |
| log.info("[CLEANUP] Removed file: %s", path) |
| elif os.path.isdir(path): |
| shutil.rmtree(path) |
| log.info("[CLEANUP] Removed dir: %s", path) |
| except Exception as e: |
| log.warning("[CLEANUP] Could not remove %s: %s", path, e) |
|
|
|
|
| |
| def background_processing(session_data: dict): |
| session_id = session_data.get("sessionId", "unknown") |
| video_url = session_data.get("originalVideoUrl") |
| callback_url = session_data.get("callbackBaseUrl") |
|
|
| log.info("[JOB] Started β session: %s", session_id) |
|
|
| session_dir = os.path.join(RESULT_DIR, session_id) |
| local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4") |
| os.makedirs(session_dir, exist_ok=True) |
|
|
| |
| if not video_url: |
| log.error("[JOB] No video URL for session %s", session_id) |
| _send_callback(callback_url, { |
| "sessionId": session_id, "finalVideoUrl": None, "report": [], |
| "error": "Missing video URL", |
| }, session_id) |
| return |
|
|
| if not callback_url: |
| log.error("[JOB] No callback URL for session %s", session_id) |
| return |
|
|
| |
| try: |
| log.info("[JOB] Downloading video: %s", video_url) |
| response = http.get(str(video_url), stream=True, timeout=300) |
| response.raise_for_status() |
|
|
| total_bytes = 0 |
| with open(local_input_path, "wb") as f: |
| for chunk in response.iter_content(chunk_size=1024 * 1024): |
| if chunk: |
| f.write(chunk) |
| total_bytes += len(chunk) |
|
|
| if total_bytes == 0 or not os.path.exists(local_input_path): |
| raise ValueError("Downloaded file is empty or missing") |
|
|
| log.info("[JOB] Downloaded %.2f MB", total_bytes / (1024 * 1024)) |
|
|
| except requests.exceptions.Timeout: |
| log.error("[JOB] Download timed out for session %s", session_id) |
| _send_callback(callback_url, { |
| "sessionId": session_id, "finalVideoUrl": None, "report": [], |
| "error": "Video download timed out", |
| }, session_id) |
| _cleanup([local_input_path, session_dir]) |
| return |
| except requests.exceptions.HTTPError as e: |
| log.error("[JOB] HTTP error downloading video: %s", e) |
| _send_callback(callback_url, { |
| "sessionId": session_id, "finalVideoUrl": None, "report": [], |
| "error": f"Video download HTTP error: {e}", |
| }, session_id) |
| _cleanup([local_input_path, session_dir]) |
| return |
| except Exception as e: |
| log.error("[JOB] Download failed: %s", e) |
| _send_callback(callback_url, { |
| "sessionId": session_id, "finalVideoUrl": None, "report": [], |
| "error": f"Video download failed: {e}", |
| }, session_id) |
| _cleanup([local_input_path, session_dir]) |
| return |
|
|
| |
| final_questions = [] |
| skipped_failed_reports = [] |
|
|
| for q in session_data.get("answers", []): |
| qid = q.get("aiQuestionId") |
|
|
| if q.get("isAnswered"): |
| start_s = time_to_seconds(q.get("startedAt", "00:00:00")) |
| end_s = time_to_seconds(q.get("submittedAt", "00:00:00")) |
|
|
| if end_s <= start_s: |
| log.warning( |
| "[JOB] Q%s: end_time (%d) <= start_time (%d) β treating as unanswered", |
| qid, end_s, start_s, |
| ) |
| skipped_failed_reports.append({ |
| "questionId": qid, "userAnswerText": "N/A", |
| "score": 0.0, "relevance": 0.0, "confidence": 0.0, |
| "stress": 0.0, "clarity": 0.0, "pauses": 0.0, |
| "toneOfVoice": 3, "status": "invalid_timing", |
| }) |
| continue |
|
|
| question_text = (q.get("questionText") or "").strip() |
| expected_answer = (q.get("expectedAnswer") or "").strip() |
|
|
| if not question_text: |
| log.warning("[JOB] Q%s: empty questionText", qid) |
|
|
| final_questions.append({ |
| "question_id": qid, |
| "question_text": question_text, |
| "ideal_answer": expected_answer, |
| "start_time": start_s, |
| "end_time": end_s, |
| }) |
| else: |
| status = "skipped" if q.get("isSkipped") else "failed" |
| skipped_failed_reports.append({ |
| "questionId": qid, "userAnswerText": "N/A", |
| "score": 0.0, "relevance": 0.0, "confidence": 0.0, |
| "stress": 0.0, "clarity": 0.0, "pauses": 0.0, |
| "toneOfVoice": 3, "status": status, |
| }) |
|
|
| |
| ai_results = [] |
| final_video_url = None |
|
|
| if final_questions: |
| log.info("[JOB] Running pipeline for %d questions β¦", len(final_questions)) |
| try: |
| final_video_path, report_path = run_intervision_pipeline( |
| local_input_path, final_questions, session_dir |
| ) |
|
|
| |
| if report_path and os.path.exists(report_path): |
| try: |
| with open(report_path, "r") as f: |
| ai_results = json.load(f).get("listOfAnswerReport", []) |
| except (json.JSONDecodeError, KeyError) as e: |
| log.error("[JOB] Failed to parse report.json: %s", e) |
| ai_results = [] |
| else: |
| log.warning("[JOB] report.json not found β using empty results") |
|
|
| |
| if final_video_path and os.path.exists(final_video_path): |
| if os.path.getsize(final_video_path) == 0: |
| log.warning("[JOB] Annotated video is 0 bytes β skipping upload") |
| elif not all([_cloud_name, _cloud_key, _cloud_secret]): |
| log.warning("[JOB] Cloudinary not configured β skipping upload") |
| else: |
| try: |
| upload_res = cloudinary.uploader.upload( |
| final_video_path, |
| public_id=f"res_{session_id}", |
| folder="intervision_results", |
| resource_type="video", |
| ) |
| final_video_url = upload_res.get("secure_url") |
| if not final_video_url: |
| log.warning("[JOB] Cloudinary returned no secure_url") |
| else: |
| log.info("[JOB] Video uploaded: %s", final_video_url) |
| except cloudinary.exceptions.Error as e: |
| log.error("[JOB] Cloudinary upload error: %s", e) |
| except Exception as e: |
| log.error("[JOB] Unexpected upload error: %s", e) |
| else: |
| log.warning("[JOB] No final video to upload") |
|
|
| except Exception as e: |
| log.error("[JOB] Pipeline crashed for session %s: %s", session_id, e) |
| |
| else: |
| log.info("[JOB] No answered questions β skipping pipeline") |
|
|
| |
| final_payload = { |
| "sessionId": session_id, |
| "finalVideoUrl": final_video_url, |
| "report": ai_results + skipped_failed_reports, |
| } |
| _send_callback(callback_url, final_payload, session_id) |
|
|
| |
| _cleanup([local_input_path, session_dir]) |
| log.info("[JOB] Done β session: %s", session_id) |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "status": "Intervision AI Engine Running", |
| "message": "API is working successfully", |
| } |
|
|
|
|
| @app.post("/process-interview") |
| async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest): |
| """ |
| Accepts an interview session and queues it for background processing. |
| Responds immediately; results are sent to callbackBaseUrl when ready. |
| """ |
| log.info("[API] /process-interview β session: %s", data.sessionId) |
| background_tasks.add_task(background_processing, data.dict()) |
| return { |
| "message": "Processing started", |
| "sessionId": data.sessionId, |
| } |
|
|
|
|
| @app.post("/delete-video-by-url") |
| async def delete_video_by_url(data: DeleteVideoRequest): |
| """Delete a Cloudinary video by its public URL.""" |
| video_url = (data.videoUrl or "").strip() |
| if not video_url: |
| raise HTTPException(status_code=400, detail="videoUrl is required") |
|
|
| if not all([_cloud_name, _cloud_key, _cloud_secret]): |
| raise HTTPException( |
| status_code=503, |
| detail="Cloudinary is not configured on this server", |
| ) |
|
|
| try: |
| url_parts = video_url.split("/") |
| filename_with_ext = url_parts[-1] |
|
|
| if "." not in filename_with_ext: |
| raise ValueError(f"Cannot parse filename from URL: {video_url!r}") |
|
|
| filename = filename_with_ext.rsplit(".", 1)[0] |
| folder = url_parts[-2] if len(url_parts) >= 2 and "intervision_results" in url_parts[-2] else "" |
| public_id = f"{folder}/{filename}" if folder else filename |
|
|
| log.info("[DELETE] Deleting public_id: %s", public_id) |
| result = cloudinary.uploader.destroy(public_id, resource_type="video") |
|
|
| if result.get("result") == "ok": |
| return {"status": "success", "message": f"Deleted {public_id}"} |
| |
| |
| if result.get("result") == "not found": |
| log.warning("[DELETE] public_id not found on Cloudinary: %s", public_id) |
| return JSONResponse( |
| status_code=404, |
| content={"status": "not_found", "message": f"{public_id} was not found"}, |
| ) |
|
|
| log.warning("[DELETE] Unexpected Cloudinary result: %s", result) |
| return JSONResponse(status_code=500, content={"status": "failed", "details": result}) |
|
|
| except cloudinary.exceptions.Error as e: |
| log.error("[DELETE] Cloudinary error: %s", e) |
| raise HTTPException(status_code=502, detail=f"Cloudinary error: {e}") |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except Exception as e: |
| log.error("[DELETE] Unexpected error: %s", e) |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |