import os import requests import time from typing import List, Dict, Any, Optional from dataclasses import dataclass from datetime import datetime from .models import NewsArticle @dataclass class FirebaseConfig: api_key: str auth_domain: str project_id: str storage_bucket: str messaging_sender_id: str app_id: str # Collection names ARTICLES_COLLECTION: str = "articles" ENGLISH_ARTICLES_COLLECTION: str = "Articles" # Dedicated English articles collection (capital A) FIREBASE_CONFIG = FirebaseConfig( api_key=os.environ.get("FIREBASE_API_KEY", "AIzaSyAX2ZBIB5lkBEEgXydi__Qlb0WBpUmntCk"), auth_domain=os.environ.get("FIREBASE_AUTH_DOMAIN", "cve-articles-b4f4f.firebaseapp.com"), project_id=os.environ.get("FIREBASE_PROJECT_ID", "cve-articles-b4f4f"), storage_bucket=os.environ.get("FIREBASE_STORAGE_BUCKET", "cve-articles-b4f4f.firebasestorage.app"), messaging_sender_id=os.environ.get("FIREBASE_MESSAGING_SENDER_ID", "682945772298"), app_id=os.environ.get("FIREBASE_APP_ID", "1:682945772298:web:b0d1dab0c7e07f83fad8f3") ) class FirebaseNewsLoader: def __init__(self, config: Optional[FirebaseConfig] = None): self.config = config or FIREBASE_CONFIG self.project_id = self.config.project_id self.api_key = self.config.api_key def fetch_articles(self, limit: int = 5000, language: str = "English") -> List[NewsArticle]: """Fetch articles with optional limit, language filter, and rate limiting handling.""" try: collection_name = "articles" # Use structured query to filter by language if language: return self._fetch_articles_with_filter(collection_name, limit, language) else: return self._fetch_articles_simple(collection_name, limit) except Exception as e: print(f"❌ Firebase error: {e}") return [] def fetch_articles_by_language(self, language: str = "English", limit: int = 5000) -> List[NewsArticle]: """Fetch articles filtered by language - convenience method.""" return self.fetch_articles(limit=limit, language=language) def fetch_english_articles(self, limit: int = 5000) -> List[NewsArticle]: """ Fetch articles from the dedicated English articles collection. This mirrors the JavaScript fetchEnglishArticles function. """ try: collection_name = self.config.ENGLISH_ARTICLES_COLLECTION print(f"🔍 Fetching English articles from '{collection_name}' collection...") # Use simple GET request to fetch from the English articles collection base_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" articles: List[NewsArticle] = [] page_token: Optional[str] = None batch_size = min(100, limit or 100) # Start conservative remaining = limit consecutive_rate_limits = 0 max_rate_limit_retries = 5 # Prevent infinite loops while True: if remaining is not None and remaining <= 0: break # Progressive batch size reduction on rate limits if consecutive_rate_limits > 2: batch_size = max(10, batch_size // 2) print(f"🔽 Reducing batch size to {batch_size} due to rate limits") page_size = batch_size if remaining is None else min(batch_size, remaining) params = { "key": self.config.api_key, "pageSize": page_size } if page_token: params["pageToken"] = page_token print(f"📡 Requesting {page_size} articles from English collection...") resp = requests.get(base_url, params=params, timeout=30) if resp.status_code == 429: # Rate limit consecutive_rate_limits += 1 if consecutive_rate_limits > max_rate_limit_retries: print(f"❌ Too many consecutive rate limits ({consecutive_rate_limits}), stopping fetch") break retry_after = int(resp.headers.get('Retry-After', min(60, 10 * consecutive_rate_limits))) print(f"⏳ Rate limited #{consecutive_rate_limits}, waiting {retry_after}s...") time.sleep(retry_after) continue else: consecutive_rate_limits = 0 # Reset on success if resp.status_code != 200: print(f"❌ Failed to fetch English articles: {resp.status_code}") if resp.status_code == 404: print(f"💡 Collection '{collection_name}' not found. Falling back to language filtering...") return self.fetch_articles(limit=limit, language="English") elif resp.status_code >= 500: print(f"🔄 Server error {resp.status_code}, retrying...") time.sleep(2) continue else: print(f"🔄 Falling back to language filtering due to error {resp.status_code}") return self.fetch_articles(limit=limit, language="English") data = resp.json() docs = data.get("documents", []) if not docs: print("📭 No more documents in English collection") break # Convert documents to NewsArticle objects batch_articles = [] for doc in docs: article = self._convert_english_doc(doc) if article: batch_articles.append(article) articles.extend(batch_articles) print(f"✅ Processed {len(batch_articles)} articles from batch") if remaining is not None: remaining -= len(docs) # Check for next page page_token = data.get("nextPageToken") if not page_token: break # Small delay to avoid rate limiting time.sleep(0.1) print(f"🎯 Successfully fetched {len(articles)} English articles") return articles except Exception as e: print(f"❌ Error fetching English articles: {e}") import traceback traceback.print_exc() # Fallback to the old method print("🔄 Falling back to language filtering method...") return self.fetch_articles(limit=limit, language="English") def _convert_english_doc(self, doc: Dict[str, Any]) -> Optional[NewsArticle]: """ Convert Firebase document from English articles collection to NewsArticle. Optimized for the specific structure of English articles. """ try: doc_name = doc.get("name", "") doc_id = doc_name.split("/")[-1] if doc_name else "unknown" fields = doc.get("fields", {}) # Extract field values with proper type handling data: Dict[str, Any] = {} for fname, fval in fields.items(): if fval and isinstance(fval, dict): # Handle different Firestore value types if "stringValue" in fval: data[fname] = fval["stringValue"] elif "integerValue" in fval: data[fname] = int(fval["integerValue"]) elif "doubleValue" in fval: data[fname] = float(fval["doubleValue"]) elif "timestampValue" in fval: data[fname] = fval["timestampValue"] elif "booleanValue" in fval: data[fname] = fval["booleanValue"] else: # Get the first available value type ftype = list(fval.keys())[0] data[fname] = fval[ftype] # Enhanced field mapping for English articles collection # Try multiple field name variations for content content_candidates = [ "content", "Content", "article_text", "Article_text", "articleText", "text", "Text", "body", "Body", "description", "Description", "summary", "Summary", "article_content", "articleContent", "full_text" ] content = "" content_field_used = None for candidate in content_candidates: if candidate in data and data[candidate]: content = str(data[candidate]).strip() content_field_used = candidate break # Try multiple field name variations for title title_candidates = [ "title", "Title", "headline", "Headline", "subject", "Subject", "name", "Name", "article_title", "articleTitle" ] title = "Untitled" for candidate in title_candidates: if candidate in data and data[candidate]: title = str(data[candidate]).strip() break # Try multiple field name variations for URL url_candidates = [ "url", "URL", "link", "Link", "href", "source_url", "sourceUrl", "web_url" ] url = f"firebase://english_articles/{doc_id}" for candidate in url_candidates: if candidate in data and data[candidate]: url_value = str(data[candidate]).strip() if url_value.startswith(('http://', 'https://')): url = url_value break # Source information source_candidates = ["source", "Source", "publisher", "Publisher", "site", "Site"] source = "English Articles Collection" for candidate in source_candidates: if candidate in data and data[candidate]: source = str(data[candidate]).strip() break # Date information date_candidates = [ "published_date", "publishedDate", "date", "Date", "created_at", "createdAt", "timestamp", "publish_time", "publication_date" ] published_date = datetime.now().isoformat() for candidate in date_candidates: if candidate in data and data[candidate]: published_date = str(data[candidate]) break # Quality check - ensure we have substantial content if len(content) < 100: print(f"⚠️ English article {doc_id[:8]}... has minimal content:") print(f" Content field '{content_field_used}': {len(content)} chars") print(f" Available fields: {list(data.keys())}") # Try to combine multiple fields if content is insufficient combined_content = [] if title and title != "Untitled": combined_content.append(f"Title: {title}") for field_name, field_value in data.items(): if (isinstance(field_value, str) and len(field_value) > 50 and field_name not in content_candidates[:3]): # Not already used combined_content.append(f"{field_name}: {field_value}") if combined_content: content = "\n\n".join(combined_content) print(f" 📝 Combined content from multiple fields: {len(content)} chars") article = NewsArticle( title=title, content=content, url=url, source=source, published_date=published_date, scraped_date=data.get("scraped_date", data.get("scrapedAt", datetime.now().isoformat())), article_id=doc_id, ) # Add language marker (since these are from English collection) article.language = "english" # Match the JavaScript implementation return article except Exception as e: print(f"⚠️ Error converting English article {doc_id}: {e}") return None def _fetch_articles_with_filter(self, collection_name: str, limit: int, language: str) -> List[NewsArticle]: """Fetch articles using Firestore structured query with language filter.""" try: # Firestore structured query endpoint query_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents:runQuery" remaining = None if (limit is None or (isinstance(limit, int) and limit <= 0)) else int(limit) articles: List[NewsArticle] = [] # First, let's check what the data actually looks like print(f"🔍 Analyzing Firebase data structure for language filtering...") # Get a small sample first to understand the data structure sample_query = { "structuredQuery": { "from": [{"collectionId": collection_name}], "limit": 3 } } headers = {'Content-Type': 'application/json'} params = {"key": self.api_key} sample_resp = requests.post(query_url, json=sample_query, headers=headers, params=params, timeout=30) if sample_resp.status_code == 200: sample_data = sample_resp.json() print(f"📋 Sample response contains {len(sample_data) if isinstance(sample_data, list) else 1} items") # Analyze the structure of the first document if isinstance(sample_data, list) and len(sample_data) > 0: first_item = sample_data[0] if "document" in first_item: doc = first_item["document"] if "fields" in doc: fields = doc["fields"] available_fields = list(fields.keys()) print(f"📊 Available fields: {available_fields}") # Check language field specifically if "language" in fields: lang_field = fields["language"] print(f"🔤 Language field structure: {lang_field}") if "stringValue" in lang_field: print(f"🔤 Language value: '{lang_field['stringValue']}'") else: print("⚠️ No 'language' field found! Looking for alternatives...") # Check for alternative language field names lang_candidates = [f for f in available_fields if 'lang' in f.lower()] if lang_candidates: print(f"🔍 Possible language fields: {lang_candidates}") # Use the first candidate alt_field = lang_candidates[0] print(f"🔄 Using '{alt_field}' as language field") language_field = alt_field else: print("❌ No language field found. Falling back to content analysis.") return self._fetch_with_content_filter(collection_name, limit, language) # Sample a few more documents to see language distribution lang_values = set() for item in sample_data: if "document" in item and "fields" in item["document"]: doc_fields = item["document"]["fields"] if "language" in doc_fields and "stringValue" in doc_fields["language"]: lang_values.add(doc_fields["language"]["stringValue"]) print(f"🌐 Language values found in sample: {list(lang_values)}") elif isinstance(sample_data, dict) and "documents" in sample_data: # Different response format documents = sample_data["documents"] print(f"📋 Found {len(documents)} documents in response") if documents: first_doc = documents[0] if "fields" in first_doc: fields = first_doc["fields"] available_fields = list(fields.keys()) print(f"📊 Available fields: {available_fields}") else: print(f"❌ Sample query failed: {sample_resp.status_code}") # Continue anyway with best guess # Now try to query with language filter language_variants = [language, language.lower(), language.upper(), language.capitalize()] for lang_variant in language_variants: print(f"🔍 Trying language filter: '{lang_variant}'") query_data = { "structuredQuery": { "from": [{"collectionId": collection_name}], "where": { "fieldFilter": { "field": {"fieldPath": "language"}, "op": "EQUAL", "value": {"stringValue": lang_variant} } }, "limit": min(remaining or 1000, 1000) } } resp = requests.post(query_url, json=query_data, headers=headers, params=params, timeout=30) if resp.status_code == 200: data = resp.json() if isinstance(data, list): filtered_count = len(data) print(f"📈 Found {filtered_count} articles with language='{lang_variant}'") if filtered_count > 0: # Process the results for result in data: if "document" in result: doc = result["document"] art = self._convert_doc(doc) if art: articles.append(art) elif "fields" in result: # Direct document format art = self._convert_doc(result) if art: articles.append(art) # If we got good results, continue with this variant if len(articles) >= 5: # Lower threshold print(f"✅ Using language variant '{lang_variant}' - found {len(articles)} articles") break elif isinstance(data, dict) and "documents" in data: # Alternative response format documents = data["documents"] filtered_count = len(documents) print(f"📈 Found {filtered_count} documents with language='{lang_variant}'") if filtered_count > 0: for doc in documents: art = self._convert_doc(doc) if art: articles.append(art) if len(articles) >= 5: print(f"✅ Using language variant '{lang_variant}' - found {len(articles)} articles") break else: print(f"❌ Query failed for '{lang_variant}': {resp.status_code}") time.sleep(0.2) # Small delay between attempts # If we still don't have enough articles, fall back to content filtering if len(articles) < 100: print(f"⚠️ Only found {len(articles)} articles with language filter. Trying content-based filtering...") fallback_articles = self._fetch_with_content_filter(collection_name, remaining or 1000, language) # Merge results, avoiding duplicates existing_ids = {art.article_id for art in articles} for art in fallback_articles: if art.article_id not in existing_ids: articles.append(art) if remaining and len(articles) >= remaining: break print(f"✅ Fetched {len(articles)} {language} articles from Firebase") return articles[:remaining] if remaining else articles except Exception as e: print(f"❌ Error in filtered fetch: {e}") import traceback traceback.print_exc() # Fallback to simple fetch return self._fetch_articles_simple(collection_name, limit) def _fetch_with_content_filter(self, collection_name: str, limit: int, language: str) -> List[NewsArticle]: """Fetch articles and filter by content analysis (fallback method).""" print(f"🔄 Fetching articles and filtering by content for {language}...") # Fetch more articles to filter from raw_articles = self._fetch_articles_simple(collection_name, min(2000, limit * 3)) filtered_articles = [] for article in raw_articles: if self._is_likely_language(article.content, language): filtered_articles.append(article) if len(filtered_articles) >= limit: break print(f"📊 Content filtering: {len(filtered_articles)} {language} articles from {len(raw_articles)} total") return filtered_articles def _is_likely_language(self, text: str, target_language: str) -> bool: """Simple heuristic to check if text is likely in the target language.""" if not text or len(text) < 50: return False if target_language.lower() in ["english", "en"]: return self._is_likely_english(text) # For other languages, we'll need different heuristics # For now, default to True return True def _is_likely_english(self, text: str) -> bool: """Simple heuristic to check if text is likely English.""" if not text or len(text) < 50: return False # Common English words and patterns english_indicators = { 'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i', 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we', 'say', 'her', 'she', 'or', 'an', 'will', 'my', 'one', 'all', 'would', 'there', 'their', 'what', 'so', 'up', 'out', 'if', 'about', 'who', 'get', 'which', 'go', 'me', 'when', 'make', 'can', 'like', 'time', 'security', 'vulnerability', 'attack', 'system', 'software', 'data', 'network', 'computer', 'application', 'server', 'database', 'information', 'technology', 'cyber', 'malware', 'breach' } # Convert to lowercase and split into words words = text.lower().replace(',', ' ').replace('.', ' ').split()[:100] # Check first 100 words if len(words) < 10: return False # Count English indicators english_count = 0 for word in words: # Remove punctuation for matching clean_word = ''.join(c for c in word if c.isalnum()) if clean_word in english_indicators: english_count += 1 ratio = english_count / len(words) return ratio > 0.15 # At least 15% English indicators def _fetch_articles_simple(self, collection_name: str, limit: int) -> List[NewsArticle]: """Original simple fetch method without filtering.""" try: base_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" remaining = None if (limit is None or (isinstance(limit, int) and limit <= 0)) else int(limit) page_token: Optional[str] = None batch_size = min(100, 300) # Smaller batch size to avoid rate limiting articles: List[NewsArticle] = [] request_count = 0 max_requests = 50 # Limit total requests to avoid rate limiting while True: if remaining is not None and remaining <= 0: break if request_count >= max_requests: print(f"⏳ Reached max requests limit ({max_requests}), stopping to avoid rate limits") break page_size = batch_size if remaining is None else min(batch_size, remaining) params = {"key": self.api_key, "pageSize": page_size} if page_token: params["pageToken"] = page_token # Add delay between requests to avoid rate limiting if request_count > 0: time.sleep(0.2) # 200ms delay between requests resp = requests.get(base_url, params=params, timeout=30) request_count += 1 if resp.status_code == 429: # Rate limit retry_after = int(resp.headers.get('Retry-After', 60)) print(f"❌ Firebase API rate limited: waiting {retry_after}s") time.sleep(retry_after) continue if resp.status_code != 200: print(f"❌ Firebase API failed: {resp.status_code}") if resp.status_code >= 500: # Server error, might be temporary time.sleep(5) continue break data = resp.json() docs = data.get("documents", []) if not docs: break for d in docs: art = self._convert_doc(d) if art: articles.append(art) if remaining is not None: remaining -= len(docs) page_token = data.get("nextPageToken") if not page_token: break return articles except Exception as e: print(f"❌ Firebase error: {e}") return [] def _convert_doc(self, doc: Dict[str, Any]) -> Optional[NewsArticle]: """Convert Firebase document to NewsArticle with improved field mapping.""" try: doc_name = doc.get("name", "") doc_id = doc_name.split("/")[-1] if doc_name else "unknown" fields = doc.get("fields", {}) # Extract field values with better handling data: Dict[str, Any] = {} for fname, fval in fields.items(): if fval and isinstance(fval, dict): # Handle different Firestore value types if "stringValue" in fval: data[fname] = fval["stringValue"] elif "integerValue" in fval: data[fname] = fval["integerValue"] elif "doubleValue" in fval: data[fname] = fval["doubleValue"] elif "timestampValue" in fval: data[fname] = fval["timestampValue"] elif "booleanValue" in fval: data[fname] = fval["booleanValue"] else: # Get the first available value type ftype = list(fval.keys())[0] data[fname] = fval[ftype] # Try multiple field name variations for content content_candidates = [ "Article_text", "article_text", "content", "Content", "text", "Text", "body", "Body", "description", "Description", "summary", "Summary", "article_content", "articleContent" ] content = "" content_field = None for candidate in content_candidates: if candidate in data and data[candidate]: content = str(data[candidate]).strip() content_field = candidate break # Try multiple field name variations for title title_candidates = [ "Title", "title", "headline", "Headline", "subject", "Subject", "name", "Name" ] title = "Untitled" for candidate in title_candidates: if candidate in data and data[candidate]: title = str(data[candidate]).strip() break # Try multiple field name variations for URL url_candidates = [ "URL", "url", "link", "Link", "href", "source_url", "sourceUrl" ] url = f"firebase://doc/{doc_id}" for candidate in url_candidates: if candidate in data and data[candidate]: url = str(data[candidate]).strip() break # Debug output for empty content if not content or len(content) < 50: available_fields = list(data.keys()) print(f"⚠️ Article {doc_id[:8]}... has minimal content:") print(f" Content field '{content_field}': {len(content)} chars") print(f" Available fields: {available_fields}") print(f" Sample data: {str(data)[:200]}...") article = NewsArticle( title=title, content=content, url=url, source=data.get("source", data.get("Source", "Firebase")), published_date=data.get("Date", data.get("date", data.get("published_date", data.get("createdAt", datetime.now().isoformat())))), scraped_date=data.get("scrapedAt", data.get("scraped_date", data.get("createdAt", datetime.now().isoformat()))), article_id=doc_id, ) return article except Exception as e: print(f"⚠️ Document conversion error for {doc_id}: {e}") return None def load_news_articles(self, collection_name: str = "Articles", limit: int = 100) -> List[NewsArticle]: return self.fetch_articles(collection_name, limit) def analyze_schema(self, collection_name: str = "Articles") -> Dict[str, Any]: try: url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" params = {"key": self.api_key, "pageSize": 5} response = requests.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() documents = data.get("documents", []) if not documents: return {"error": "empty", "collection": collection_name} all_fields = set() sample_data = [] for doc in documents: fields = doc.get("fields", {}) field_names = list(fields.keys()) all_fields.update(field_names) sample_values: Dict[str, Any] = {} for fname, fdata in fields.items(): if fdata and isinstance(fdata, dict): ftype = list(fdata.keys())[0] sample_values[fname] = str(fdata[ftype])[:100] doc_id = doc.get("name", "").split("/")[-1] sample_data.append({"id": doc_id, "fields": field_names, "sample": sample_values}) return { "collection": collection_name, "document_count": len(documents), "unique_fields": sorted(list(all_fields)), "field_count": len(all_fields), "sample_documents": sample_data, } return {"error": f"status {response.status_code}", "collection": collection_name} except Exception as e: return {"error": str(e), "collection": collection_name} def get_collections_info(self) -> List[Dict[str, Any]]: possible = ["Articles", "articles"] results: List[Dict[str, Any]] = [] seen = set() for name in possible: if name in seen: continue arts = self.fetch_articles(name, limit=5) if arts: results.append({ "name": name, "document_count": "≥" + str(len(arts)), "sample_titles": [a.title for a in arts[:3]], }) seen.add(name) if not results: results.append({"name": "Articles", "document_count": 0}) return results