NLPGenius commited on
Commit
16779bb
Β·
1 Parent(s): d004d70

vectore stor & firebase api error

Browse files
Files changed (1) hide show
  1. cve_factchecker/app.py +179 -23
cve_factchecker/app.py CHANGED
@@ -3,6 +3,12 @@ from flask import Flask, jsonify, request
3
  from typing import Any, Dict
4
  import time
5
  import threading
 
 
 
 
 
 
6
  from .orchestrator import FactCheckSystem
7
  from .firebase_service import FirebaseVectorSync
8
  try:
@@ -10,44 +16,179 @@ try:
10
  except Exception: # pragma: no cover
11
  CORS = None # type: ignore
12
 
13
- system = FactCheckSystem()
14
- firebase_sync = FirebaseVectorSync()
15
-
16
- import os
17
- AUTO_INGEST = True # Always ingest on startup for seamless experience
18
  INGEST_STATUS: Dict[str, Any] = {"started": time.time(), "finished": False, "synced": 0}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  def _background_ingest() -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  try:
22
- print("πŸš€ Refreshing vector store with latest 5000 Firebase articles...")
23
- ingest_res = system.ingest_firebase(limit=5000) # Use orchestrator method with 5000 limit
24
- INGEST_STATUS.update({"finished": True, **ingest_res})
25
- if not ingest_res.get("success"):
26
- print("⚠️ Startup ingestion did not succeed:", ingest_res.get("error"))
27
- else:
28
- print(f"βœ… Startup ingestion complete: {ingest_res.get('synced')} articles")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  # Log LLM availability
30
- if system.analyzer.client:
31
- print(f"πŸ€– LLM active: model={system.cfg.model} max_tokens={system.cfg.max_tokens}")
32
- else:
33
- print("⚠️ No LLM API key detected. Using heuristic fallback.")
 
 
 
 
34
  except Exception as e:
35
  INGEST_STATUS.update({"finished": True, "error": str(e)})
36
  print(f"❌ Startup ingestion failed: {e}")
 
 
37
 
38
  def _start_ingest_thread() -> None:
 
39
  if not AUTO_INGEST:
 
40
  return
 
 
 
 
 
41
  t = threading.Thread(target=_background_ingest, name="firebase-ingest", daemon=True)
42
  t.start()
 
43
 
44
  app = Flask(__name__)
45
  if CORS:
46
  CORS(app, resources={r"/*": {"origins": "*"}})
47
  start_time = time.time()
48
 
49
- # Start ingestion in background as soon as the module is imported / app is created
50
- _start_ingest_thread()
 
 
 
 
51
 
52
  @app.route('/health')
53
  def health() -> Any:
@@ -56,13 +197,28 @@ def health() -> Any:
56
  ## Simplified API: only /health and /fact-check provided. Data ingestion occurs automatically on startup.
57
 
58
  def _run_fact_check(claim: str): # internal helper
 
 
 
 
 
 
 
 
 
 
59
  if not INGEST_STATUS.get("finished"):
60
  return {"verdict": "INITIALIZING", "reasoning": "Ingestion still in progress. Try again soon.", "confidence": 0.0}, 503
61
- result = system.fact_check(claim)
62
- if result.get('verdict') == 'ERROR' and '402' in result.get('reasoning',''):
63
- result['verdict'] = 'UNVERIFIED'
64
- result['reasoning'] = 'LLM quota/credits insufficient. Retrieval performed; provide API key to enable full analysis.'
65
- return result, 200
 
 
 
 
 
66
 
67
  @app.route('/fact-check', methods=['POST','GET'])
68
  def fact_check() -> Any:
 
3
  from typing import Any, Dict
4
  import time
5
  import threading
6
+ import os
7
+ try:
8
+ import fcntl
9
+ except ImportError:
10
+ # Windows doesn't have fcntl, use alternative locking
11
+ fcntl = None
12
  from .orchestrator import FactCheckSystem
13
  from .firebase_service import FirebaseVectorSync
14
  try:
 
16
  except Exception: # pragma: no cover
17
  CORS = None # type: ignore
18
 
19
+ # Global initialization with proper error handling
20
+ system = None
21
+ firebase_sync = None
22
+ AUTO_INGEST = os.environ.get("AUTO_INGEST", "true").lower() in ("true", "1", "yes")
 
23
  INGEST_STATUS: Dict[str, Any] = {"started": time.time(), "finished": False, "synced": 0}
24
+ INIT_LOCK = threading.Lock()
25
+ INGEST_LOCK_FILE = "/tmp/ingest.lock" if os.name != 'nt' else "ingest.lock"
26
+
27
+ def _safe_initialize_system():
28
+ """Initialize system with proper error handling and singleton pattern."""
29
+ global system, firebase_sync
30
+
31
+ with INIT_LOCK:
32
+ if system is not None:
33
+ return
34
+
35
+ try:
36
+ print("οΏ½ Initializing fact-check system...")
37
+
38
+ # Use safe directory for vector store
39
+ vector_dir = "/tmp/vector_db"
40
+ if os.path.exists("/data"):
41
+ vector_dir = "/data/vector_db"
42
+
43
+ system = FactCheckSystem(vector_dir=vector_dir)
44
+ firebase_sync = FirebaseVectorSync()
45
+ print("βœ… System initialized successfully")
46
+
47
+ except Exception as e:
48
+ print(f"❌ System initialization failed: {e}")
49
+ # Create minimal fallback system
50
+ try:
51
+ system = FactCheckSystem(vector_dir="/tmp/vector_db_fallback")
52
+ firebase_sync = None
53
+ print("⚠️ Using fallback system")
54
+ except Exception as fallback_error:
55
+ print(f"❌ Even fallback failed: {fallback_error}")
56
+ system = None
57
+ firebase_sync = None
58
+
59
+ def _is_ingest_locked():
60
+ """Check if another process is already running ingestion."""
61
+ try:
62
+ return os.path.exists(INGEST_LOCK_FILE)
63
+ except:
64
+ return False
65
+
66
+ def _acquire_ingest_lock():
67
+ """Acquire lock for ingestion process."""
68
+ try:
69
+ if fcntl: # Unix-like systems
70
+ with open(INGEST_LOCK_FILE, 'w') as f:
71
+ fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
72
+ f.write(str(os.getpid()))
73
+ return True
74
+ else: # Windows or systems without fcntl
75
+ if os.path.exists(INGEST_LOCK_FILE):
76
+ return False
77
+ with open(INGEST_LOCK_FILE, 'w') as f:
78
+ f.write(str(os.getpid()))
79
+ return True
80
+ except (IOError, OSError):
81
+ return False
82
+
83
+ def _release_ingest_lock():
84
+ """Release ingestion lock."""
85
+ try:
86
+ if os.path.exists(INGEST_LOCK_FILE):
87
+ os.remove(INGEST_LOCK_FILE)
88
+ except:
89
+ pass
90
 
91
  def _background_ingest() -> None:
92
+ """Background ingestion with proper locking and error handling."""
93
+ global system
94
+
95
+ # Check if another worker is already doing ingestion
96
+ if _is_ingest_locked():
97
+ print("⏳ Another process is handling ingestion, skipping...")
98
+ INGEST_STATUS.update({"finished": True, "skipped": True})
99
+ return
100
+
101
+ # Try to acquire lock
102
+ if not _acquire_ingest_lock():
103
+ print("⏳ Could not acquire ingestion lock, skipping...")
104
+ INGEST_STATUS.update({"finished": True, "skipped": True})
105
+ return
106
+
107
  try:
108
+ _safe_initialize_system()
109
+
110
+ if system is None:
111
+ print("❌ System not initialized, cannot perform ingestion")
112
+ INGEST_STATUS.update({"finished": True, "error": "System initialization failed"})
113
+ return
114
+
115
+ print("πŸš€ Refreshing vector store with latest Firebase articles...")
116
+
117
+ # Add retry logic with exponential backoff for Firebase API
118
+ max_retries = 3
119
+ base_delay = 5 # Start with 5 seconds
120
+
121
+ for attempt in range(max_retries):
122
+ try:
123
+ # Use smaller limit to avoid rate limiting
124
+ limit = min(1000, 5000) # Start with smaller batch
125
+ ingest_res = system.ingest_firebase(limit=limit)
126
+
127
+ if ingest_res.get("success"):
128
+ INGEST_STATUS.update({"finished": True, **ingest_res})
129
+ print(f"βœ… Startup ingestion complete: {ingest_res.get('synced')} articles")
130
+ break
131
+ else:
132
+ error_msg = ingest_res.get("error", "Unknown error")
133
+ if "429" in str(error_msg) and attempt < max_retries - 1:
134
+ delay = base_delay * (2 ** attempt) # Exponential backoff
135
+ print(f"⏳ Rate limited, waiting {delay}s before retry {attempt + 1}/{max_retries}")
136
+ time.sleep(delay)
137
+ continue
138
+ else:
139
+ print(f"⚠️ Startup ingestion failed: {error_msg}")
140
+ INGEST_STATUS.update({"finished": True, "error": error_msg})
141
+ break
142
+
143
+ except Exception as e:
144
+ if "429" in str(e) and attempt < max_retries - 1:
145
+ delay = base_delay * (2 ** attempt)
146
+ print(f"⏳ Rate limited (exception), waiting {delay}s before retry {attempt + 1}/{max_retries}")
147
+ time.sleep(delay)
148
+ continue
149
+ else:
150
+ raise e
151
+
152
  # Log LLM availability
153
+ try:
154
+ if system and system.analyzer and system.analyzer.client:
155
+ print(f"πŸ€– LLM active: model={system.cfg.model} max_tokens={system.cfg.max_tokens}")
156
+ else:
157
+ print("⚠️ No LLM API key detected. Using heuristic fallback.")
158
+ except Exception as e:
159
+ print(f"⚠️ Could not check LLM status: {e}")
160
+
161
  except Exception as e:
162
  INGEST_STATUS.update({"finished": True, "error": str(e)})
163
  print(f"❌ Startup ingestion failed: {e}")
164
+ finally:
165
+ _release_ingest_lock()
166
 
167
  def _start_ingest_thread() -> None:
168
+ """Start ingestion thread with proper conditions."""
169
  if not AUTO_INGEST:
170
+ print("⏭️ Auto-ingestion disabled")
171
  return
172
+
173
+ # Only start if we're not already finished
174
+ if INGEST_STATUS.get("finished"):
175
+ return
176
+
177
  t = threading.Thread(target=_background_ingest, name="firebase-ingest", daemon=True)
178
  t.start()
179
+ print("πŸ”„ Started background ingestion thread")
180
 
181
  app = Flask(__name__)
182
  if CORS:
183
  CORS(app, resources={r"/*": {"origins": "*"}})
184
  start_time = time.time()
185
 
186
+ # Initialize system safely
187
+ _safe_initialize_system()
188
+
189
+ # Start ingestion in background only for the main process
190
+ if os.environ.get('WERKZEUG_RUN_MAIN') != 'true' or not hasattr(os, 'fork'):
191
+ _start_ingest_thread()
192
 
193
  @app.route('/health')
194
  def health() -> Any:
 
197
  ## Simplified API: only /health and /fact-check provided. Data ingestion occurs automatically on startup.
198
 
199
  def _run_fact_check(claim: str): # internal helper
200
+ """Run fact check with proper error handling."""
201
+ global system
202
+
203
+ # Ensure system is initialized
204
+ if system is None:
205
+ _safe_initialize_system()
206
+
207
+ if system is None:
208
+ return {"verdict": "ERROR", "reasoning": "System initialization failed. Please try again later.", "confidence": 0.0}, 503
209
+
210
  if not INGEST_STATUS.get("finished"):
211
  return {"verdict": "INITIALIZING", "reasoning": "Ingestion still in progress. Try again soon.", "confidence": 0.0}, 503
212
+
213
+ try:
214
+ result = system.fact_check(claim)
215
+ if result.get('verdict') == 'ERROR' and '402' in result.get('reasoning',''):
216
+ result['verdict'] = 'UNVERIFIED'
217
+ result['reasoning'] = 'LLM quota/credits insufficient. Retrieval performed; provide API key to enable full analysis.'
218
+ return result, 200
219
+ except Exception as e:
220
+ print(f"❌ Fact check error: {e}")
221
+ return {"verdict": "ERROR", "reasoning": f"Analysis failed: {str(e)}", "confidence": 0.0}, 500
222
 
223
  @app.route('/fact-check', methods=['POST','GET'])
224
  def fact_check() -> Any: