Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from flask import Flask, jsonify, request | |
| from typing import Any, Dict | |
| import time | |
| import threading | |
| import os | |
| try: | |
| import fcntl | |
| except ImportError: | |
| # Windows doesn't have fcntl, use alternative locking | |
| fcntl = None | |
| from .orchestrator import FactCheckSystem | |
| from .firebase_service import FirebaseVectorSync | |
| try: | |
| from flask_cors import CORS # type: ignore | |
| except Exception: # pragma: no cover | |
| CORS = None # type: ignore | |
| # Global initialization with proper error handling | |
| system = None | |
| firebase_sync = None | |
| AUTO_INGEST = os.environ.get("AUTO_INGEST", "true").lower() in ("true", "1", "yes") | |
| LANGUAGE_FILTER = os.environ.get("LANGUAGE_FILTER", "English") # Default to English articles | |
| INGEST_STATUS: Dict[str, Any] = {"started": time.time(), "finished": False, "synced": 0} | |
| INIT_LOCK = threading.Lock() | |
| INGEST_LOCK_FILE = "/tmp/ingest.lock" if os.name != 'nt' else "ingest.lock" | |
| def _safe_initialize_system(): | |
| """Initialize system with proper error handling and singleton pattern.""" | |
| global system, firebase_sync | |
| with INIT_LOCK: | |
| if system is not None: | |
| return | |
| try: | |
| print("οΏ½ Initializing fact-check system...") | |
| # Use safe directory for vector store | |
| vector_dir = "/tmp/vector_db" | |
| if os.path.exists("/data"): | |
| vector_dir = "/data/vector_db" | |
| system = FactCheckSystem(vector_dir=vector_dir) | |
| firebase_sync = FirebaseVectorSync() | |
| print("β System initialized successfully") | |
| except Exception as e: | |
| print(f"β System initialization failed: {e}") | |
| # Create minimal fallback system | |
| try: | |
| system = FactCheckSystem(vector_dir="/tmp/vector_db_fallback") | |
| firebase_sync = None | |
| print("β οΈ Using fallback system") | |
| except Exception as fallback_error: | |
| print(f"β Even fallback failed: {fallback_error}") | |
| system = None | |
| firebase_sync = None | |
| def _is_ingest_locked(): | |
| """Check if another process is already running ingestion.""" | |
| try: | |
| return os.path.exists(INGEST_LOCK_FILE) | |
| except: | |
| return False | |
| def _acquire_ingest_lock(): | |
| """Acquire lock for ingestion process.""" | |
| try: | |
| if fcntl: # Unix-like systems | |
| with open(INGEST_LOCK_FILE, 'w') as f: | |
| fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) | |
| f.write(str(os.getpid())) | |
| return True | |
| else: # Windows or systems without fcntl | |
| if os.path.exists(INGEST_LOCK_FILE): | |
| return False | |
| with open(INGEST_LOCK_FILE, 'w') as f: | |
| f.write(str(os.getpid())) | |
| return True | |
| except (IOError, OSError): | |
| return False | |
| def _release_ingest_lock(): | |
| """Release ingestion lock.""" | |
| try: | |
| if os.path.exists(INGEST_LOCK_FILE): | |
| os.remove(INGEST_LOCK_FILE) | |
| except: | |
| pass | |
| def _background_ingest() -> None: | |
| """Background ingestion with proper locking and error handling.""" | |
| global system | |
| # Check if another worker is already doing ingestion | |
| if _is_ingest_locked(): | |
| print("β³ Another process is handling ingestion, skipping...") | |
| INGEST_STATUS.update({"finished": True, "skipped": True}) | |
| return | |
| # Try to acquire lock | |
| if not _acquire_ingest_lock(): | |
| print("β³ Could not acquire ingestion lock, skipping...") | |
| INGEST_STATUS.update({"finished": True, "skipped": True}) | |
| return | |
| try: | |
| _safe_initialize_system() | |
| if system is None: | |
| print("β System not initialized, cannot perform ingestion") | |
| INGEST_STATUS.update({"finished": True, "error": "System initialization failed"}) | |
| return | |
| print(f"π Refreshing vector store with latest {LANGUAGE_FILTER} Firebase articles...") | |
| # Add retry logic with exponential backoff for Firebase API | |
| max_retries = 3 | |
| base_delay = 5 # Start with 5 seconds | |
| for attempt in range(max_retries): | |
| try: | |
| # Use smaller limit to avoid rate limiting, filter for specified language | |
| limit = min(1000, 5000) # Start with smaller batch | |
| ingest_res = system.ingest_firebase(limit=limit, language=LANGUAGE_FILTER) | |
| if ingest_res.get("success"): | |
| INGEST_STATUS.update({"finished": True, **ingest_res}) | |
| print(f"β Startup ingestion complete: {ingest_res.get('synced')} {ingest_res.get('language', LANGUAGE_FILTER)} articles") | |
| break | |
| else: | |
| error_msg = ingest_res.get("error", "Unknown error") | |
| if "429" in str(error_msg) and attempt < max_retries - 1: | |
| delay = base_delay * (2 ** attempt) # Exponential backoff | |
| print(f"β³ Rate limited, waiting {delay}s before retry {attempt + 1}/{max_retries}") | |
| time.sleep(delay) | |
| continue | |
| else: | |
| print(f"β οΈ Startup ingestion failed: {error_msg}") | |
| INGEST_STATUS.update({"finished": True, "error": error_msg}) | |
| break | |
| except Exception as e: | |
| if "429" in str(e) and attempt < max_retries - 1: | |
| delay = base_delay * (2 ** attempt) | |
| print(f"β³ Rate limited (exception), waiting {delay}s before retry {attempt + 1}/{max_retries}") | |
| time.sleep(delay) | |
| continue | |
| else: | |
| raise e | |
| # Log LLM availability | |
| try: | |
| if system and system.analyzer and system.analyzer.client: | |
| print(f"π€ LLM active: model={system.cfg.model} max_tokens={system.cfg.max_tokens}") | |
| else: | |
| print("β οΈ No LLM API key detected. Using heuristic fallback.") | |
| except Exception as e: | |
| print(f"β οΈ Could not check LLM status: {e}") | |
| except Exception as e: | |
| INGEST_STATUS.update({"finished": True, "error": str(e)}) | |
| print(f"β Startup ingestion failed: {e}") | |
| finally: | |
| _release_ingest_lock() | |
| def _start_ingest_thread() -> None: | |
| """Start ingestion thread with proper conditions.""" | |
| if not AUTO_INGEST: | |
| print("βοΈ Auto-ingestion disabled") | |
| return | |
| # Only start if we're not already finished | |
| if INGEST_STATUS.get("finished"): | |
| return | |
| t = threading.Thread(target=_background_ingest, name="firebase-ingest", daemon=True) | |
| t.start() | |
| print("π Started background ingestion thread") | |
| app = Flask(__name__) | |
| if CORS: | |
| CORS(app, resources={r"/*": {"origins": "*"}}) | |
| start_time = time.time() | |
| # Initialize system safely | |
| _safe_initialize_system() | |
| # Start ingestion in background only for the main process | |
| if os.environ.get('WERKZEUG_RUN_MAIN') != 'true' or not hasattr(os, 'fork'): | |
| _start_ingest_thread() | |
| def health() -> Any: | |
| return jsonify({"status": "ok", "uptime_sec": round(time.time()-start_time,2)}) | |
| ## Simplified API: only /health and /fact-check provided. Data ingestion occurs automatically on startup. | |
| def _run_fact_check(claim: str): # internal helper | |
| """Run fact check with proper error handling.""" | |
| global system | |
| # Ensure system is initialized | |
| if system is None: | |
| _safe_initialize_system() | |
| if system is None: | |
| return {"verdict": "ERROR", "reasoning": "System initialization failed. Please try again later.", "confidence": 0.0}, 503 | |
| if not INGEST_STATUS.get("finished"): | |
| return {"verdict": "INITIALIZING", "reasoning": "Ingestion still in progress. Try again soon.", "confidence": 0.0}, 503 | |
| try: | |
| result = system.fact_check(claim) | |
| if result.get('verdict') == 'ERROR' and '402' in result.get('reasoning',''): | |
| result['verdict'] = 'UNVERIFIED' | |
| result['reasoning'] = 'LLM quota/credits insufficient. Retrieval performed; provide API key to enable full analysis.' | |
| return result, 200 | |
| except Exception as e: | |
| print(f"β Fact check error: {e}") | |
| return {"verdict": "ERROR", "reasoning": f"Analysis failed: {str(e)}", "confidence": 0.0}, 500 | |
| def fact_check() -> Any: | |
| claim: Any = None | |
| if request.method == 'GET': | |
| claim = request.args.get('claim') or request.args.get('text') | |
| else: # POST | |
| content_type = (request.content_type or '').lower() | |
| # Prefer form-data / urlencoded strictly when provided (Postman Body: form-data) | |
| if 'multipart/form-data' in content_type or 'application/x-www-form-urlencoded' in content_type: | |
| # form fields first | |
| claim = request.form.get('claim') or request.form.get('text') | |
| # then file fields | |
| if not claim and request.files: | |
| f = (request.files.get('file') or request.files.get('claim') or request.files.get('claim_file')) | |
| if f: | |
| try: | |
| claim = (f.read() or b'').decode('utf-8', errors='ignore').strip() | |
| except Exception: | |
| claim = None | |
| if not claim: | |
| return jsonify({"error": "Please provide claim via form-data: key 'claim' or 'text' (or upload file as 'file')."}), 400 | |
| else: | |
| # JSON payload | |
| payload = request.get_json(silent=True) or {} | |
| claim = payload.get('claim') or payload.get('text') | |
| # raw text/plain | |
| if not claim and 'text/plain' in content_type: | |
| raw = request.get_data(cache=False, as_text=True) or '' | |
| claim = raw.strip() | |
| # query fallback | |
| if not claim: | |
| claim = request.args.get('claim') or request.args.get('text') | |
| if not claim: | |
| return jsonify({"error": "claim parameter or JSON field 'claim' required"}), 400 | |
| result, code = _run_fact_check(claim) | |
| return jsonify(result), code | |
| def index(): | |
| # Convenience: allow GET /?claim=... for Postman users | |
| q_claim = request.args.get('claim') | |
| if q_claim: | |
| result, code = _run_fact_check(q_claim) | |
| return jsonify(result), code | |
| return jsonify({ | |
| "name": "CVE Fact Checker API (Simplified)", | |
| "status": { | |
| "ingestion_finished": INGEST_STATUS.get("finished"), | |
| "synced_articles": INGEST_STATUS.get("synced"), | |
| "ingestion_error": INGEST_STATUS.get("error") | |
| }, | |
| "endpoints": [ | |
| "GET /health", | |
| "GET /fact-check?claim=...", | |
| "POST /fact-check (JSON: {claim}, form-data: claim|text, x-www-form-urlencoded, text/plain, or file upload)", | |
| "GET /?claim=... (alias)" | |
| ] | |
| }) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', '7860')) | |
| app.run(host='0.0.0.0', port=port) | |