Fayza38's picture
Update app.py
9fef579 verified
import os
import json
import requests
import shutil
import logging
import cloudinary
import cloudinary.uploader
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fastapi import FastAPI, Body, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from pydantic import BaseModel, HttpUrl, validator
from typing import List, Optional
from pipeline import run_intervision_pipeline
# ── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)s] %(asctime)s - %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
# ── Retry strategy ────────────────────────────────────────────────────────────
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
load_dotenv()
app = FastAPI(title="Intervision AI Engine")
# ── Cloudinary ────────────────────────────────────────────────────────────────
_cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME")
_cloud_key = os.getenv("CLOUDINARY_API_KEY")
_cloud_secret = os.getenv("CLOUDINARY_API_SECRET")
if not all([_cloud_name, _cloud_key, _cloud_secret]):
log.warning(
"Cloudinary credentials missing – video upload will be skipped. "
"Set CLOUDINARY_CLOUD_NAME, CLOUDINARY_API_KEY, CLOUDINARY_API_SECRET."
)
cloudinary.config(
cloud_name=_cloud_name,
api_key=_cloud_key,
api_secret=_cloud_secret,
)
# ── Directories ───────────────────────────────────────────────────────────────
RESULT_DIR = "temp_data/results"
UPLOAD_DIR = "temp_data/uploads"
os.makedirs(RESULT_DIR, exist_ok=True)
os.makedirs(UPLOAD_DIR, exist_ok=True)
# ── Pydantic models ───────────────────────────────────────────────────────────
class Answer(BaseModel):
aiQuestionId: int
questionText: str
expectedAnswer: str
isAnswered: bool
isSkipped: bool
isFailed: bool
startedAt: str
submittedAt: str
@validator("startedAt", "submittedAt")
def validate_timestamp(cls, v):
if not v:
return "00:00:00"
parts = v.split(":")
if len(parts) != 3:
raise ValueError(f"Timestamp must be HH:MM:SS, got: {v!r}")
try:
list(map(int, parts))
except ValueError:
raise ValueError(f"Timestamp parts must be integers, got: {v!r}")
return v
class InterviewRequest(BaseModel):
sessionId: str
originalVideoUrl: HttpUrl
callbackBaseUrl: HttpUrl
answers: List[Answer]
@validator("answers")
def answers_not_empty(cls, v):
if not v:
raise ValueError("answers list must not be empty")
return v
class DeleteVideoRequest(BaseModel):
videoUrl: str
# ── Helpers ───────────────────────────────────────────────────────────────────
def time_to_seconds(t_str: str) -> int:
"""Convert HH:MM:SS to total seconds. Returns 0 on any error."""
if not t_str:
return 0
try:
parts = t_str.split(":")
if len(parts) != 3:
log.warning("time_to_seconds: unexpected format %r – defaulting to 0", t_str)
return 0
h, m, s = map(int, parts)
total = h * 3600 + m * 60 + s
if total < 0:
log.warning("time_to_seconds: negative value %d – clamping to 0", total)
return 0
return total
except Exception as e:
log.warning("time_to_seconds(%r) error: %s – defaulting to 0", t_str, e)
return 0
def _send_callback(callback_url: str, payload: dict, session_id: str):
"""POST the final payload to the callback endpoint. Logs but never raises."""
try:
endpoint = str(callback_url).rstrip("/")
resp = requests.post(endpoint, json=payload, timeout=30)
resp.raise_for_status()
log.info("[CALLBACK] %s β†’ HTTP %d", endpoint, resp.status_code)
except requests.exceptions.Timeout:
log.error("[CALLBACK] Timeout sending results for session %s", session_id)
except requests.exceptions.ConnectionError as e:
log.error("[CALLBACK] Connection error for session %s: %s", session_id, e)
except requests.exceptions.HTTPError as e:
log.error("[CALLBACK] Server returned error for session %s: %s", session_id, e)
except Exception as e:
log.error("[CALLBACK] Unexpected error for session %s: %s", session_id, e)
def _cleanup(paths: list):
"""Remove files/directories silently."""
for path in paths:
try:
if os.path.isfile(path):
os.remove(path)
log.info("[CLEANUP] Removed file: %s", path)
elif os.path.isdir(path):
shutil.rmtree(path)
log.info("[CLEANUP] Removed dir: %s", path)
except Exception as e:
log.warning("[CLEANUP] Could not remove %s: %s", path, e)
# ── Background job ────────────────────────────────────────────────────────────
def background_processing(session_data: dict):
session_id = session_data.get("sessionId", "unknown")
video_url = session_data.get("originalVideoUrl")
callback_url = session_data.get("callbackBaseUrl")
log.info("[JOB] Started – session: %s", session_id)
session_dir = os.path.join(RESULT_DIR, session_id)
local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4")
os.makedirs(session_dir, exist_ok=True)
# ── 1. Validate inputs ────────────────────────────────────────────────────
if not video_url:
log.error("[JOB] No video URL for session %s", session_id)
_send_callback(callback_url, {
"sessionId": session_id, "finalVideoUrl": None, "report": [],
"error": "Missing video URL",
}, session_id)
return
if not callback_url:
log.error("[JOB] No callback URL for session %s", session_id)
return
# ── 2. Download video ─────────────────────────────────────────────────────
try:
log.info("[JOB] Downloading video: %s", video_url)
response = http.get(str(video_url), stream=True, timeout=300)
response.raise_for_status()
total_bytes = 0
with open(local_input_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
if total_bytes == 0 or not os.path.exists(local_input_path):
raise ValueError("Downloaded file is empty or missing")
log.info("[JOB] Downloaded %.2f MB", total_bytes / (1024 * 1024))
except requests.exceptions.Timeout:
log.error("[JOB] Download timed out for session %s", session_id)
_send_callback(callback_url, {
"sessionId": session_id, "finalVideoUrl": None, "report": [],
"error": "Video download timed out",
}, session_id)
_cleanup([local_input_path, session_dir])
return
except requests.exceptions.HTTPError as e:
log.error("[JOB] HTTP error downloading video: %s", e)
_send_callback(callback_url, {
"sessionId": session_id, "finalVideoUrl": None, "report": [],
"error": f"Video download HTTP error: {e}",
}, session_id)
_cleanup([local_input_path, session_dir])
return
except Exception as e:
log.error("[JOB] Download failed: %s", e)
_send_callback(callback_url, {
"sessionId": session_id, "finalVideoUrl": None, "report": [],
"error": f"Video download failed: {e}",
}, session_id)
_cleanup([local_input_path, session_dir])
return
# ── 3. Parse & validate question timings ──────────────────────────────────
final_questions = []
skipped_failed_reports = []
for q in session_data.get("answers", []):
qid = q.get("aiQuestionId")
if q.get("isAnswered"):
start_s = time_to_seconds(q.get("startedAt", "00:00:00"))
end_s = time_to_seconds(q.get("submittedAt", "00:00:00"))
if end_s <= start_s:
log.warning(
"[JOB] Q%s: end_time (%d) <= start_time (%d) – treating as unanswered",
qid, end_s, start_s,
)
skipped_failed_reports.append({
"questionId": qid, "userAnswerText": "N/A",
"score": 0.0, "relevance": 0.0, "confidence": 0.0,
"stress": 0.0, "clarity": 0.0, "pauses": 0.0,
"toneOfVoice": 3, "status": "invalid_timing",
})
continue
question_text = (q.get("questionText") or "").strip()
expected_answer = (q.get("expectedAnswer") or "").strip()
if not question_text:
log.warning("[JOB] Q%s: empty questionText", qid)
final_questions.append({
"question_id": qid,
"question_text": question_text,
"ideal_answer": expected_answer,
"start_time": start_s,
"end_time": end_s,
})
else:
status = "skipped" if q.get("isSkipped") else "failed"
skipped_failed_reports.append({
"questionId": qid, "userAnswerText": "N/A",
"score": 0.0, "relevance": 0.0, "confidence": 0.0,
"stress": 0.0, "clarity": 0.0, "pauses": 0.0,
"toneOfVoice": 3, "status": status,
})
# ── 4. Run pipeline ───────────────────────────────────────────────────────
ai_results = []
final_video_url = None
if final_questions:
log.info("[JOB] Running pipeline for %d questions …", len(final_questions))
try:
final_video_path, report_path = run_intervision_pipeline(
local_input_path, final_questions, session_dir
)
# Parse report
if report_path and os.path.exists(report_path):
try:
with open(report_path, "r") as f:
ai_results = json.load(f).get("listOfAnswerReport", [])
except (json.JSONDecodeError, KeyError) as e:
log.error("[JOB] Failed to parse report.json: %s", e)
ai_results = []
else:
log.warning("[JOB] report.json not found – using empty results")
# Upload annotated video
if final_video_path and os.path.exists(final_video_path):
if os.path.getsize(final_video_path) == 0:
log.warning("[JOB] Annotated video is 0 bytes – skipping upload")
elif not all([_cloud_name, _cloud_key, _cloud_secret]):
log.warning("[JOB] Cloudinary not configured – skipping upload")
else:
try:
upload_res = cloudinary.uploader.upload(
final_video_path,
public_id=f"res_{session_id}",
folder="intervision_results",
resource_type="video",
)
final_video_url = upload_res.get("secure_url")
if not final_video_url:
log.warning("[JOB] Cloudinary returned no secure_url")
else:
log.info("[JOB] Video uploaded: %s", final_video_url)
except cloudinary.exceptions.Error as e:
log.error("[JOB] Cloudinary upload error: %s", e)
except Exception as e:
log.error("[JOB] Unexpected upload error: %s", e)
else:
log.warning("[JOB] No final video to upload")
except Exception as e:
log.error("[JOB] Pipeline crashed for session %s: %s", session_id, e)
# ai_results stays [] – callback will still fire with skipped + failed
else:
log.info("[JOB] No answered questions – skipping pipeline")
# ── 5. Callback ───────────────────────────────────────────────────────────
final_payload = {
"sessionId": session_id,
"finalVideoUrl": final_video_url,
"report": ai_results + skipped_failed_reports,
}
_send_callback(callback_url, final_payload, session_id)
# ── 6. Cleanup ────────────────────────────────────────────────────────────
_cleanup([local_input_path, session_dir])
log.info("[JOB] Done – session: %s", session_id)
# ══════════════════════════════════════════════════════════════════════════════
# ROUTES
# ══════════════════════════════════════════════════════════════════════════════
@app.get("/")
async def root():
return {
"status": "Intervision AI Engine Running",
"message": "API is working successfully",
}
@app.post("/process-interview")
async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest):
"""
Accepts an interview session and queues it for background processing.
Responds immediately; results are sent to callbackBaseUrl when ready.
"""
log.info("[API] /process-interview – session: %s", data.sessionId)
background_tasks.add_task(background_processing, data.dict())
return {
"message": "Processing started",
"sessionId": data.sessionId,
}
@app.post("/delete-video-by-url")
async def delete_video_by_url(data: DeleteVideoRequest):
"""Delete a Cloudinary video by its public URL."""
video_url = (data.videoUrl or "").strip()
if not video_url:
raise HTTPException(status_code=400, detail="videoUrl is required")
if not all([_cloud_name, _cloud_key, _cloud_secret]):
raise HTTPException(
status_code=503,
detail="Cloudinary is not configured on this server",
)
try:
url_parts = video_url.split("/")
filename_with_ext = url_parts[-1]
if "." not in filename_with_ext:
raise ValueError(f"Cannot parse filename from URL: {video_url!r}")
filename = filename_with_ext.rsplit(".", 1)[0]
folder = url_parts[-2] if len(url_parts) >= 2 and "intervision_results" in url_parts[-2] else ""
public_id = f"{folder}/{filename}" if folder else filename
log.info("[DELETE] Deleting public_id: %s", public_id)
result = cloudinary.uploader.destroy(public_id, resource_type="video")
if result.get("result") == "ok":
return {"status": "success", "message": f"Deleted {public_id}"}
# "not found" is still a valid non-error state
if result.get("result") == "not found":
log.warning("[DELETE] public_id not found on Cloudinary: %s", public_id)
return JSONResponse(
status_code=404,
content={"status": "not_found", "message": f"{public_id} was not found"},
)
log.warning("[DELETE] Unexpected Cloudinary result: %s", result)
return JSONResponse(status_code=500, content={"status": "failed", "details": result})
except cloudinary.exceptions.Error as e:
log.error("[DELETE] Cloudinary error: %s", e)
raise HTTPException(status_code=502, detail=f"Cloudinary error: {e}")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
log.error("[DELETE] Unexpected error: %s", e)
raise HTTPException(status_code=500, detail=str(e))
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)