NLPGenius's picture
Fix permission errors, rate limiting, and add English language filtering
e06a21d
raw
history blame
11.3 kB
from __future__ import annotations
from flask import Flask, jsonify, request
from typing import Any, Dict
import time
import threading
import os
try:
import fcntl
except ImportError:
# Windows doesn't have fcntl, use alternative locking
fcntl = None
from .orchestrator import FactCheckSystem
from .firebase_service import FirebaseVectorSync
try:
from flask_cors import CORS # type: ignore
except Exception: # pragma: no cover
CORS = None # type: ignore
# Global initialization with proper error handling
system = None
firebase_sync = None
AUTO_INGEST = os.environ.get("AUTO_INGEST", "true").lower() in ("true", "1", "yes")
LANGUAGE_FILTER = os.environ.get("LANGUAGE_FILTER", "English") # Default to English articles
INGEST_STATUS: Dict[str, Any] = {"started": time.time(), "finished": False, "synced": 0}
INIT_LOCK = threading.Lock()
INGEST_LOCK_FILE = "/tmp/ingest.lock" if os.name != 'nt' else "ingest.lock"
def _safe_initialize_system():
"""Initialize system with proper error handling and singleton pattern."""
global system, firebase_sync
with INIT_LOCK:
if system is not None:
return
try:
print("οΏ½ Initializing fact-check system...")
# Use safe directory for vector store
vector_dir = "/tmp/vector_db"
if os.path.exists("/data"):
vector_dir = "/data/vector_db"
system = FactCheckSystem(vector_dir=vector_dir)
firebase_sync = FirebaseVectorSync()
print("βœ… System initialized successfully")
except Exception as e:
print(f"❌ System initialization failed: {e}")
# Create minimal fallback system
try:
system = FactCheckSystem(vector_dir="/tmp/vector_db_fallback")
firebase_sync = None
print("⚠️ Using fallback system")
except Exception as fallback_error:
print(f"❌ Even fallback failed: {fallback_error}")
system = None
firebase_sync = None
def _is_ingest_locked():
"""Check if another process is already running ingestion."""
try:
return os.path.exists(INGEST_LOCK_FILE)
except:
return False
def _acquire_ingest_lock():
"""Acquire lock for ingestion process."""
try:
if fcntl: # Unix-like systems
with open(INGEST_LOCK_FILE, 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
f.write(str(os.getpid()))
return True
else: # Windows or systems without fcntl
if os.path.exists(INGEST_LOCK_FILE):
return False
with open(INGEST_LOCK_FILE, 'w') as f:
f.write(str(os.getpid()))
return True
except (IOError, OSError):
return False
def _release_ingest_lock():
"""Release ingestion lock."""
try:
if os.path.exists(INGEST_LOCK_FILE):
os.remove(INGEST_LOCK_FILE)
except:
pass
def _background_ingest() -> None:
"""Background ingestion with proper locking and error handling."""
global system
# Check if another worker is already doing ingestion
if _is_ingest_locked():
print("⏳ Another process is handling ingestion, skipping...")
INGEST_STATUS.update({"finished": True, "skipped": True})
return
# Try to acquire lock
if not _acquire_ingest_lock():
print("⏳ Could not acquire ingestion lock, skipping...")
INGEST_STATUS.update({"finished": True, "skipped": True})
return
try:
_safe_initialize_system()
if system is None:
print("❌ System not initialized, cannot perform ingestion")
INGEST_STATUS.update({"finished": True, "error": "System initialization failed"})
return
print(f"πŸš€ Refreshing vector store with latest {LANGUAGE_FILTER} Firebase articles...")
# Add retry logic with exponential backoff for Firebase API
max_retries = 3
base_delay = 5 # Start with 5 seconds
for attempt in range(max_retries):
try:
# Use smaller limit to avoid rate limiting, filter for specified language
limit = min(1000, 5000) # Start with smaller batch
ingest_res = system.ingest_firebase(limit=limit, language=LANGUAGE_FILTER)
if ingest_res.get("success"):
INGEST_STATUS.update({"finished": True, **ingest_res})
print(f"βœ… Startup ingestion complete: {ingest_res.get('synced')} {ingest_res.get('language', LANGUAGE_FILTER)} articles")
break
else:
error_msg = ingest_res.get("error", "Unknown error")
if "429" in str(error_msg) and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) # Exponential backoff
print(f"⏳ Rate limited, waiting {delay}s before retry {attempt + 1}/{max_retries}")
time.sleep(delay)
continue
else:
print(f"⚠️ Startup ingestion failed: {error_msg}")
INGEST_STATUS.update({"finished": True, "error": error_msg})
break
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"⏳ Rate limited (exception), waiting {delay}s before retry {attempt + 1}/{max_retries}")
time.sleep(delay)
continue
else:
raise e
# Log LLM availability
try:
if system and system.analyzer and system.analyzer.client:
print(f"πŸ€– LLM active: model={system.cfg.model} max_tokens={system.cfg.max_tokens}")
else:
print("⚠️ No LLM API key detected. Using heuristic fallback.")
except Exception as e:
print(f"⚠️ Could not check LLM status: {e}")
except Exception as e:
INGEST_STATUS.update({"finished": True, "error": str(e)})
print(f"❌ Startup ingestion failed: {e}")
finally:
_release_ingest_lock()
def _start_ingest_thread() -> None:
"""Start ingestion thread with proper conditions."""
if not AUTO_INGEST:
print("⏭️ Auto-ingestion disabled")
return
# Only start if we're not already finished
if INGEST_STATUS.get("finished"):
return
t = threading.Thread(target=_background_ingest, name="firebase-ingest", daemon=True)
t.start()
print("πŸ”„ Started background ingestion thread")
app = Flask(__name__)
if CORS:
CORS(app, resources={r"/*": {"origins": "*"}})
start_time = time.time()
# Initialize system safely
_safe_initialize_system()
# Start ingestion in background only for the main process
if os.environ.get('WERKZEUG_RUN_MAIN') != 'true' or not hasattr(os, 'fork'):
_start_ingest_thread()
@app.route('/health')
def health() -> Any:
return jsonify({"status": "ok", "uptime_sec": round(time.time()-start_time,2)})
## Simplified API: only /health and /fact-check provided. Data ingestion occurs automatically on startup.
def _run_fact_check(claim: str): # internal helper
"""Run fact check with proper error handling."""
global system
# Ensure system is initialized
if system is None:
_safe_initialize_system()
if system is None:
return {"verdict": "ERROR", "reasoning": "System initialization failed. Please try again later.", "confidence": 0.0}, 503
if not INGEST_STATUS.get("finished"):
return {"verdict": "INITIALIZING", "reasoning": "Ingestion still in progress. Try again soon.", "confidence": 0.0}, 503
try:
result = system.fact_check(claim)
if result.get('verdict') == 'ERROR' and '402' in result.get('reasoning',''):
result['verdict'] = 'UNVERIFIED'
result['reasoning'] = 'LLM quota/credits insufficient. Retrieval performed; provide API key to enable full analysis.'
return result, 200
except Exception as e:
print(f"❌ Fact check error: {e}")
return {"verdict": "ERROR", "reasoning": f"Analysis failed: {str(e)}", "confidence": 0.0}, 500
@app.route('/fact-check', methods=['POST','GET'])
def fact_check() -> Any:
claim: Any = None
if request.method == 'GET':
claim = request.args.get('claim') or request.args.get('text')
else: # POST
content_type = (request.content_type or '').lower()
# Prefer form-data / urlencoded strictly when provided (Postman Body: form-data)
if 'multipart/form-data' in content_type or 'application/x-www-form-urlencoded' in content_type:
# form fields first
claim = request.form.get('claim') or request.form.get('text')
# then file fields
if not claim and request.files:
f = (request.files.get('file') or request.files.get('claim') or request.files.get('claim_file'))
if f:
try:
claim = (f.read() or b'').decode('utf-8', errors='ignore').strip()
except Exception:
claim = None
if not claim:
return jsonify({"error": "Please provide claim via form-data: key 'claim' or 'text' (or upload file as 'file')."}), 400
else:
# JSON payload
payload = request.get_json(silent=True) or {}
claim = payload.get('claim') or payload.get('text')
# raw text/plain
if not claim and 'text/plain' in content_type:
raw = request.get_data(cache=False, as_text=True) or ''
claim = raw.strip()
# query fallback
if not claim:
claim = request.args.get('claim') or request.args.get('text')
if not claim:
return jsonify({"error": "claim parameter or JSON field 'claim' required"}), 400
result, code = _run_fact_check(claim)
return jsonify(result), code
@app.route('/')
def index():
# Convenience: allow GET /?claim=... for Postman users
q_claim = request.args.get('claim')
if q_claim:
result, code = _run_fact_check(q_claim)
return jsonify(result), code
return jsonify({
"name": "CVE Fact Checker API (Simplified)",
"status": {
"ingestion_finished": INGEST_STATUS.get("finished"),
"synced_articles": INGEST_STATUS.get("synced"),
"ingestion_error": INGEST_STATUS.get("error")
},
"endpoints": [
"GET /health",
"GET /fact-check?claim=...",
"POST /fact-check (JSON: {claim}, form-data: claim|text, x-www-form-urlencoded, text/plain, or file upload)",
"GET /?claim=... (alias)"
]
})
if __name__ == '__main__':
port = int(os.environ.get('PORT', '7860'))
app.run(host='0.0.0.0', port=port)