from __future__ import annotations from flask import Flask, jsonify, request from typing import Any, Dict import time import threading import os import sys try: import fcntl except ImportError: # Windows doesn't have fcntl, use alternative locking fcntl = None from .orchestrator import FactCheckSystem from .firebase_service import FirebaseVectorSync try: from flask_cors import CORS # type: ignore except Exception: # pragma: no cover CORS = None # type: ignore # Global initialization with proper error handling system = None firebase_sync = None AUTO_INGEST = os.environ.get("AUTO_INGEST", "true").lower() in ("true", "1", "yes") LANGUAGE_FILTER = os.environ.get("LANGUAGE_FILTER", "English") # Default to English articles INGEST_STATUS: Dict[str, Any] = {"started": time.time(), "finished": False, "synced": 0} INIT_LOCK = threading.Lock() INGEST_LOCK_FILE = "/tmp/ingest.lock" if os.name != 'nt' else "ingest.lock" def _safe_initialize_system(): """Initialize system with proper error handling and singleton pattern.""" global system, firebase_sync with INIT_LOCK: if system is not None: return try: print("� Initializing fact-check system...") # Use safe directory for vector store vector_dir = "/tmp/vector_db" if os.path.exists("/data"): vector_dir = "/data/vector_db" system = FactCheckSystem(vector_dir=vector_dir) firebase_sync = FirebaseVectorSync() print("✅ System initialized successfully") except Exception as e: print(f"❌ System initialization failed: {e}") # Create minimal fallback system try: system = FactCheckSystem(vector_dir="/tmp/vector_db_fallback") firebase_sync = None print("⚠️ Using fallback system") except Exception as fallback_error: print(f"❌ Even fallback failed: {fallback_error}") system = None firebase_sync = None def _is_ingest_locked(): """Check if another process is already running ingestion.""" try: return os.path.exists(INGEST_LOCK_FILE) except: return False def _acquire_ingest_lock(): """Acquire lock for ingestion process.""" try: if fcntl: # Unix-like systems with open(INGEST_LOCK_FILE, 'w') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) f.write(str(os.getpid())) return True else: # Windows or systems without fcntl if os.path.exists(INGEST_LOCK_FILE): return False with open(INGEST_LOCK_FILE, 'w') as f: f.write(str(os.getpid())) return True except (IOError, OSError): return False def _release_ingest_lock(): """Release ingestion lock.""" try: if os.path.exists(INGEST_LOCK_FILE): os.remove(INGEST_LOCK_FILE) except: pass def _cleanup_stale_locks(): """Clean up stale lock files from previous runs.""" try: if os.path.exists(INGEST_LOCK_FILE): # Check if the process that created the lock is still running try: with open(INGEST_LOCK_FILE, 'r') as f: pid = int(f.read().strip()) # On Unix systems, check if process exists if os.name != 'nt': try: os.kill(pid, 0) # Signal 0 checks if process exists # Process exists, don't remove lock return except OSError: # Process doesn't exist, remove stale lock print(f"🗑️ Removing stale lock file (PID {pid} no longer exists)") os.remove(INGEST_LOCK_FILE) else: # On Windows, just remove old locks after reasonable time lock_age = time.time() - os.path.getmtime(INGEST_LOCK_FILE) if lock_age > 300: # 5 minutes print(f"🗑️ Removing old lock file (age: {lock_age:.0f}s)") os.remove(INGEST_LOCK_FILE) except (ValueError, FileNotFoundError): # Invalid lock file, remove it print("🗑️ Removing invalid lock file") os.remove(INGEST_LOCK_FILE) except Exception as e: print(f"⚠️ Could not clean up lock file: {e}") def _background_ingest() -> None: """Background ingestion with proper locking and error handling.""" global system # Clean up any stale lock files first _cleanup_stale_locks() # Check if another worker is already doing ingestion if _is_ingest_locked(): print("⏳ Another process is handling ingestion, skipping...") INGEST_STATUS.update({"finished": True, "skipped": True}) return # Try to acquire lock if not _acquire_ingest_lock(): print("⏳ Could not acquire ingestion lock, skipping...") INGEST_STATUS.update({"finished": True, "skipped": True}) return try: _safe_initialize_system() if system is None: print("❌ System not initialized, cannot perform ingestion") INGEST_STATUS.update({"finished": True, "error": "System initialization failed"}) return print(f"🚀 Refreshing vector store with latest {LANGUAGE_FILTER} Firebase articles...") # Add retry logic with exponential backoff for Firebase API max_retries = 3 base_delay = 5 # Start with 5 seconds for attempt in range(max_retries): try: # Use conservative limit to avoid rate limiting limit = min(500, 5000) # Further reduced initial batch # Use dedicated English articles collection for better results if LANGUAGE_FILTER.lower() in ["english", "en"]: ingest_res = system.ingest_firebase( collection="english_articles", limit=limit, language=LANGUAGE_FILTER ) else: ingest_res = system.ingest_firebase( collection="articles", limit=limit, language=LANGUAGE_FILTER ) if ingest_res.get("success"): print(f"🎯 Ingestion successful! Result: {ingest_res}") INGEST_STATUS.update({"finished": True, **ingest_res}) print(f"🎯 Updated INGEST_STATUS: {INGEST_STATUS}") print(f"✅ Startup ingestion complete: {ingest_res.get('synced')} {ingest_res.get('language', LANGUAGE_FILTER)} articles") break else: error_msg = ingest_res.get("error", "Unknown error") if "429" in str(error_msg) and attempt < max_retries - 1: delay = base_delay * (2 ** attempt) # Exponential backoff print(f"⏳ Rate limited, waiting {delay}s before retry {attempt + 1}/{max_retries}") time.sleep(delay) continue else: print(f"⚠️ Startup ingestion failed: {error_msg}") INGEST_STATUS.update({"finished": True, "error": error_msg}) break except Exception as e: if "429" in str(e) and attempt < max_retries - 1: delay = base_delay * (2 ** attempt) print(f"⏳ Rate limited (exception), waiting {delay}s before retry {attempt + 1}/{max_retries}") time.sleep(delay) continue else: print(f"❌ Ingestion attempt {attempt + 1} failed: {e}") INGEST_STATUS.update({"finished": True, "error": str(e)}) break # Log LLM availability try: if system and system.analyzer and system.analyzer.client: print(f"🤖 LLM active: model={system.cfg.model} max_tokens={system.cfg.max_tokens}") else: print("⚠️ No LLM API key detected. Using heuristic fallback.") except Exception as e: print(f"⚠️ Could not check LLM status: {e}") except Exception as e: INGEST_STATUS.update({"finished": True, "error": str(e)}) print(f"❌ Startup ingestion failed: {e}") finally: _release_ingest_lock() def _start_ingest_thread() -> None: """Start ingestion thread with proper conditions.""" if not AUTO_INGEST: print("⏭️ Auto-ingestion disabled") return # Only start if we're not already finished if INGEST_STATUS.get("finished"): print("📋 Ingestion already completed") return # Force start in production environments in_production = any([ os.path.exists("/app"), # Docker container "gunicorn" in str(sys.argv), # Gunicorn process os.environ.get("PORT") == "7860", # HuggingFace Spaces ]) if in_production: print("🚀 Production environment detected, forcing background ingestion...") t = threading.Thread(target=_background_ingest, name="firebase-ingest", daemon=True) t.start() print("🔄 Started background ingestion thread") app = Flask(__name__) if CORS: CORS(app, resources={r"/*": {"origins": "*"}}) start_time = time.time() # Initialize system safely _safe_initialize_system() # Start ingestion in background only for the main process # Force ingestion in production environments or when explicitly requested should_start_ingestion = ( AUTO_INGEST and (os.environ.get('WERKZEUG_RUN_MAIN') != 'true' or not hasattr(os, 'fork')) ) # Also force start in production regardless of WERKZEUG flags if AUTO_INGEST and any([ os.path.exists("/app"), # Docker container os.environ.get("PORT") == "7860", # HuggingFace Spaces ]): should_start_ingestion = True print("🚀 Production environment detected, enabling ingestion") if should_start_ingestion: _start_ingest_thread() @app.route('/health') def health() -> Any: """Health check with system status and optional ingestion trigger.""" global system # Basic health info health_data = { "status": "ok", "uptime_sec": round(time.time()-start_time, 2), "ingestion_status": INGEST_STATUS.copy() } # Check if we have data in the vector store (lightweight) try: if system is None: _safe_initialize_system() if system: vector_count = 0 try: vector_count = system.retriever.get_vector_count() except Exception as _: vector_count = 0 health_data["vector_store_populated"] = vector_count > 0 health_data["vector_count"] = vector_count # If no data and ingestion hasn't finished, provide more info if vector_count == 0 and not INGEST_STATUS.get("finished"): health_data["status"] = "initializing" health_data["message"] = "Vector store empty, ingestion in progress" elif vector_count == 0 and INGEST_STATUS.get("finished"): health_data["status"] = "warning" health_data["message"] = "Vector store empty after ingestion completion" # Trigger re-ingestion if requested trigger_reingestion = request.args.get('trigger_ingestion', '').lower() in ['true', '1', 'yes'] if trigger_reingestion and AUTO_INGEST: print("🔄 Health check triggering manual ingestion...") INGEST_STATUS.update({"finished": False, "manual_trigger": True}) _start_ingest_thread() health_data["message"] = "Re-ingestion triggered" else: health_data["status"] = "error" health_data["message"] = "System initialization failed" except Exception as e: health_data["status"] = "error" health_data["error"] = str(e) return jsonify(health_data) @app.route('/debug-status', methods=['GET']) def debug_status() -> Any: """Debug endpoint to check ingestion status.""" try: global system if system is None: _safe_initialize_system() vector_count = 0 if system and system.retriever: vector_count = system.retriever.get_vector_count() debug_data = { "ingest_status": dict(INGEST_STATUS), "vector_count": vector_count, "system_initialized": system is not None, "auto_ingest": AUTO_INGEST, "language_filter": LANGUAGE_FILTER } return jsonify(debug_data) except Exception as e: return jsonify({"error": str(e)}), 500 ## Simplified API: only /health and /fact-check provided. Data ingestion occurs automatically on startup. def _run_fact_check(claim: str): # internal helper """Run fact check with proper error handling.""" global system # Ensure system is initialized if system is None: _safe_initialize_system() if system is None: return {"verdict": "ERROR", "reasoning": "System initialization failed. Please try again later.", "confidence": 0.0}, 503 # Debug the status check with detailed logging ingest_finished = INGEST_STATUS.get("finished", False) vector_count = 0 try: if system and system.retriever: vector_count = system.retriever.get_vector_count() except: pass print(f"🔍 Status check - INGEST_STATUS: {INGEST_STATUS}") print(f"🔍 Status check - finished={ingest_finished}, type={type(ingest_finished)}") print(f"🔍 Status check - vector_count={vector_count}") # Check both status flag AND vector count as backup if not ingest_finished and vector_count == 0: return {"verdict": "INITIALIZING", "reasoning": "Ingestion still in progress. Try again soon.", "confidence": 0.0}, 503 # If vector DB has documents but status says not finished, update status if not ingest_finished and vector_count > 0: print("🔧 Fixing status: vector DB has documents but status not updated") INGEST_STATUS.update({"finished": True, "synced": vector_count, "fixed": True}) ingest_finished = True try: result = system.fact_check(claim) if result.get('verdict') == 'ERROR' and '402' in result.get('reasoning',''): result['verdict'] = 'UNVERIFIED' result['reasoning'] = 'LLM quota/credits insufficient. Retrieval performed; provide API key to enable full analysis.' return result, 200 except Exception as e: print(f"❌ Fact check error: {e}") return {"verdict": "ERROR", "reasoning": f"Analysis failed: {str(e)}", "confidence": 0.0}, 500 @app.route('/fact-check', methods=['POST','GET']) def fact_check() -> Any: claim: Any = None if request.method == 'GET': claim = request.args.get('claim') or request.args.get('text') else: # POST content_type = (request.content_type or '').lower() # Prefer form-data / urlencoded strictly when provided (Postman Body: form-data) if 'multipart/form-data' in content_type or 'application/x-www-form-urlencoded' in content_type: # form fields first claim = request.form.get('claim') or request.form.get('text') # then file fields if not claim and request.files: f = (request.files.get('file') or request.files.get('claim') or request.files.get('claim_file')) if f: try: claim = (f.read() or b'').decode('utf-8', errors='ignore').strip() except Exception: claim = None if not claim: return jsonify({"error": "Please provide claim via form-data: key 'claim' or 'text' (or upload file as 'file')."}), 400 else: # JSON payload payload = request.get_json(silent=True) or {} claim = payload.get('claim') or payload.get('text') # raw text/plain if not claim and 'text/plain' in content_type: raw = request.get_data(cache=False, as_text=True) or '' claim = raw.strip() # query fallback if not claim: claim = request.args.get('claim') or request.args.get('text') if not claim: return jsonify({"error": "claim parameter or JSON field 'claim' required"}), 400 result, code = _run_fact_check(claim) return jsonify(result), code @app.route('/') def index(): # Convenience: allow GET /?claim=... for Postman users q_claim = request.args.get('claim') if q_claim: result, code = _run_fact_check(q_claim) return jsonify(result), code return jsonify({ "name": "CVE Fact Checker API (Simplified)", "status": { "ingestion_finished": INGEST_STATUS.get("finished"), "synced_articles": INGEST_STATUS.get("synced"), "ingestion_error": INGEST_STATUS.get("error") }, "endpoints": [ "GET /health", "GET /fact-check?claim=...", "POST /fact-check (JSON: {claim}, form-data: claim|text, x-www-form-urlencoded, text/plain, or file upload)", "GET /?claim=... (alias)" ] }) if __name__ == '__main__': port = int(os.environ.get('PORT', '7860')) app.run(host='0.0.0.0', port=port)