import os import requests import time from typing import List, Dict, Any, Optional from dataclasses import dataclass from datetime import datetime from .models import NewsArticle @dataclass class FirebaseConfig: api_key: str auth_domain: str project_id: str storage_bucket: str messaging_sender_id: str app_id: str FIREBASE_CONFIG = FirebaseConfig( api_key=os.environ.get("FIREBASE_API_KEY", "AIzaSyAX2ZBIB5lkBEEgXydi__Qlb0WBpUmntCk"), auth_domain=os.environ.get("FIREBASE_AUTH_DOMAIN", "cve-articles-b4f4f.firebaseapp.com"), project_id=os.environ.get("FIREBASE_PROJECT_ID", "cve-articles-b4f4f"), storage_bucket=os.environ.get("FIREBASE_STORAGE_BUCKET", "cve-articles-b4f4f.firebasestorage.app"), messaging_sender_id=os.environ.get("FIREBASE_MESSAGING_SENDER_ID", "682945772298"), app_id=os.environ.get("FIREBASE_APP_ID", "1:682945772298:web:b0d1dab0c7e07f83fad8f3") ) class FirebaseNewsLoader: def __init__(self, config: Optional[FirebaseConfig] = None): self.config = config or FIREBASE_CONFIG self.project_id = self.config.project_id self.api_key = self.config.api_key def fetch_articles(self, limit: int = 5000, language: str = "English") -> List[NewsArticle]: """Fetch articles with optional limit, language filter, and rate limiting handling.""" try: collection_name = "articles" # Use structured query to filter by language if language: return self._fetch_articles_with_filter(collection_name, limit, language) else: return self._fetch_articles_simple(collection_name, limit) except Exception as e: print(f"❌ Firebase error: {e}") return [] def _fetch_articles_with_filter(self, collection_name: str, limit: int, language: str) -> List[NewsArticle]: """Fetch articles using Firestore structured query with language filter.""" try: # Firestore structured query endpoint query_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents:runQuery" remaining = None if (limit is None or (isinstance(limit, int) and limit <= 0)) else int(limit) articles: List[NewsArticle] = [] request_count = 0 max_requests = 20 # Limit total requests to avoid rate limiting # Build structured query with language filter query_data = { "structuredQuery": { "from": [{"collectionId": collection_name}], "where": { "fieldFilter": { "field": {"fieldPath": "language"}, "op": "EQUAL", "value": {"stringValue": language} } }, "orderBy": [ { "field": {"fieldPath": "__name__"}, "direction": "DESCENDING" } ] } } # Add limit if specified if remaining and remaining > 0: query_data["structuredQuery"]["limit"] = min(remaining, 1000) # Firestore max limit per query print(f"🔍 Fetching {language} articles from Firebase...") while True: if remaining is not None and remaining <= 0: break if request_count >= max_requests: print(f"⏳ Reached max requests limit ({max_requests}), stopping to avoid rate limits") break # Add delay between requests to avoid rate limiting if request_count > 0: time.sleep(0.5) # 500ms delay between requests headers = {'Content-Type': 'application/json'} params = {"key": self.api_key} resp = requests.post(query_url, json=query_data, headers=headers, params=params, timeout=30) request_count += 1 if resp.status_code == 429: # Rate limit retry_after = int(resp.headers.get('Retry-After', 60)) print(f"❌ Firebase API rate limited: waiting {retry_after}s") time.sleep(retry_after) continue elif resp.status_code != 200: print(f"❌ Firebase structured query failed: {resp.status_code}") if resp.status_code >= 500: # Server error, might be temporary time.sleep(5) continue break data = resp.json() # Handle the structured query response format # Firebase structured query returns an array of results if isinstance(data, list): query_results = data else: query_results = data.get("result", data.get("documents", [])) if not query_results: break batch_articles = [] for result in query_results: # Handle different response formats doc = None if isinstance(result, dict): if "document" in result: doc = result.get("document") else: # Direct document format doc = result if doc: art = self._convert_doc(doc) if art: batch_articles.append(art) articles.extend(batch_articles) if remaining is not None: remaining -= len(batch_articles) # Check if we have more results if len(batch_articles) < query_data["structuredQuery"].get("limit", 1000): break # No more results # Update query for next batch (if we need pagination) if remaining and remaining > 0 and batch_articles: # For next batch, start after the last document last_doc_name = batch_articles[-1].article_id query_data["structuredQuery"]["startAfter"] = { "values": [{"referenceValue": f"projects/{self.project_id}/databases/(default)/documents/{collection_name}/{last_doc_name}"}] } else: break print(f"✅ Fetched {len(articles)} {language} articles from Firebase") return articles except Exception as e: print(f"❌ Error in filtered fetch: {e}") # Fallback to simple fetch without filter return self._fetch_articles_simple(collection_name, limit) def _fetch_articles_simple(self, collection_name: str, limit: int) -> List[NewsArticle]: """Original simple fetch method without filtering.""" try: base_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" remaining = None if (limit is None or (isinstance(limit, int) and limit <= 0)) else int(limit) page_token: Optional[str] = None batch_size = min(100, 300) # Smaller batch size to avoid rate limiting articles: List[NewsArticle] = [] request_count = 0 max_requests = 50 # Limit total requests to avoid rate limiting while True: if remaining is not None and remaining <= 0: break if request_count >= max_requests: print(f"⏳ Reached max requests limit ({max_requests}), stopping to avoid rate limits") break page_size = batch_size if remaining is None else min(batch_size, remaining) params = {"key": self.api_key, "pageSize": page_size} if page_token: params["pageToken"] = page_token # Add delay between requests to avoid rate limiting if request_count > 0: time.sleep(0.2) # 200ms delay between requests resp = requests.get(base_url, params=params, timeout=30) request_count += 1 if resp.status_code == 429: # Rate limit retry_after = int(resp.headers.get('Retry-After', 60)) print(f"❌ Firebase API rate limited: waiting {retry_after}s") time.sleep(retry_after) continue elif resp.status_code != 200: print(f"❌ Firebase API failed: {resp.status_code}") if resp.status_code >= 500: # Server error, might be temporary time.sleep(5) continue break data = resp.json() docs = data.get("documents", []) if not docs: break for d in docs: art = self._convert_doc(d) if art: articles.append(art) if remaining is not None: remaining -= len(docs) page_token = data.get("nextPageToken") if not page_token: break return articles except Exception as e: print(f"❌ Firebase error: {e}") return [] def _convert_doc(self, doc: Dict[str, Any]) -> Optional[NewsArticle]: try: doc_name = doc.get("name", "") doc_id = doc_name.split("/")[-1] if doc_name else "unknown" fields = doc.get("fields", {}) data: Dict[str, Any] = {} for fname, fval in fields.items(): if fval and isinstance(fval, dict): ftype = list(fval.keys())[0] data[fname] = fval[ftype] return NewsArticle( title=data.get("Title", data.get("title", "Untitled")), content=data.get("Article_text", data.get("content", "")), url=data.get("URL", data.get("url", f"firebase://doc/{doc_id}")), source=data.get("source", "Firebase"), published_date=data.get("Date", data.get("createdAt", datetime.now().isoformat())), scraped_date=data.get("scrapedAt", data.get("createdAt", datetime.now().isoformat())), article_id=doc_id, ) except Exception as e: print(f"⚠️ Conversion error: {e}") return None def load_news_articles(self, collection_name: str = "Articles", limit: int = 100) -> List[NewsArticle]: return self.fetch_articles(collection_name, limit) def analyze_schema(self, collection_name: str = "Articles") -> Dict[str, Any]: try: url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" params = {"key": self.api_key, "pageSize": 5} response = requests.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() documents = data.get("documents", []) if not documents: return {"error": "empty", "collection": collection_name} all_fields = set() sample_data = [] for doc in documents: fields = doc.get("fields", {}) field_names = list(fields.keys()) all_fields.update(field_names) sample_values: Dict[str, Any] = {} for fname, fdata in fields.items(): if fdata and isinstance(fdata, dict): ftype = list(fdata.keys())[0] sample_values[fname] = str(fdata[ftype])[:100] doc_id = doc.get("name", "").split("/")[-1] sample_data.append({"id": doc_id, "fields": field_names, "sample": sample_values}) return { "collection": collection_name, "document_count": len(documents), "unique_fields": sorted(list(all_fields)), "field_count": len(all_fields), "sample_documents": sample_data, } return {"error": f"status {response.status_code}", "collection": collection_name} except Exception as e: return {"error": str(e), "collection": collection_name} def get_collections_info(self) -> List[Dict[str, Any]]: possible = ["Articles", "articles"] results: List[Dict[str, Any]] = [] seen = set() for name in possible: if name in seen: continue arts = self.fetch_articles(name, limit=5) if arts: results.append({ "name": name, "document_count": "≥" + str(len(arts)), "sample_titles": [a.title for a in arts[:3]], }) seen.add(name) if not results: results.append({"name": "Articles", "document_count": 0}) return results