Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from .models import NewsArticle | |
| class FirebaseConfig: | |
| api_key: str | |
| auth_domain: str | |
| project_id: str | |
| storage_bucket: str | |
| messaging_sender_id: str | |
| app_id: str | |
| # Collection names | |
| ARTICLES_COLLECTION: str = "articles" | |
| ENGLISH_ARTICLES_COLLECTION: str = "Articles" # Dedicated English articles collection (capital A) | |
| FIREBASE_CONFIG = FirebaseConfig( | |
| api_key=os.environ.get("FIREBASE_API_KEY", "AIzaSyAX2ZBIB5lkBEEgXydi__Qlb0WBpUmntCk"), | |
| auth_domain=os.environ.get("FIREBASE_AUTH_DOMAIN", "cve-articles-b4f4f.firebaseapp.com"), | |
| project_id=os.environ.get("FIREBASE_PROJECT_ID", "cve-articles-b4f4f"), | |
| storage_bucket=os.environ.get("FIREBASE_STORAGE_BUCKET", "cve-articles-b4f4f.firebasestorage.app"), | |
| messaging_sender_id=os.environ.get("FIREBASE_MESSAGING_SENDER_ID", "682945772298"), | |
| app_id=os.environ.get("FIREBASE_APP_ID", "1:682945772298:web:b0d1dab0c7e07f83fad8f3") | |
| ) | |
| class FirebaseNewsLoader: | |
| def __init__(self, config: Optional[FirebaseConfig] = None): | |
| self.config = config or FIREBASE_CONFIG | |
| self.project_id = self.config.project_id | |
| self.api_key = self.config.api_key | |
| def fetch_articles(self, limit: int = 5000, language: str = "English") -> List[NewsArticle]: | |
| """Fetch articles with optional limit, language filter, and rate limiting handling.""" | |
| try: | |
| collection_name = "articles" | |
| # Use structured query to filter by language | |
| if language: | |
| return self._fetch_articles_with_filter(collection_name, limit, language) | |
| else: | |
| return self._fetch_articles_simple(collection_name, limit) | |
| except Exception as e: | |
| print(f"β Firebase error: {e}") | |
| return [] | |
| def fetch_articles_by_language(self, language: str = "English", limit: int = 5000) -> List[NewsArticle]: | |
| """Fetch articles filtered by language - convenience method.""" | |
| return self.fetch_articles(limit=limit, language=language) | |
| def fetch_english_articles(self, limit: int = 5000) -> List[NewsArticle]: | |
| """ | |
| Fetch articles from the dedicated English articles collection. | |
| This mirrors the JavaScript fetchEnglishArticles function. | |
| """ | |
| try: | |
| collection_name = self.config.ENGLISH_ARTICLES_COLLECTION | |
| print(f"π Fetching English articles from '{collection_name}' collection...") | |
| # Use simple GET request to fetch from the English articles collection | |
| base_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" | |
| articles: List[NewsArticle] = [] | |
| page_token: Optional[str] = None | |
| batch_size = min(300, limit or 300) # Firestore max pageSize | |
| remaining = limit | |
| while True: | |
| if remaining is not None and remaining <= 0: | |
| break | |
| page_size = batch_size if remaining is None else min(batch_size, remaining) | |
| params = { | |
| "key": self.config.api_key, | |
| "pageSize": page_size | |
| } | |
| if page_token: | |
| params["pageToken"] = page_token | |
| print(f"π‘ Requesting {page_size} articles from English collection...") | |
| resp = requests.get(base_url, params=params, timeout=30) | |
| if resp.status_code == 429: # Rate limit | |
| retry_after = int(resp.headers.get('Retry-After', 30)) | |
| print(f"β³ Rate limited, waiting {retry_after}s...") | |
| time.sleep(retry_after) | |
| continue | |
| elif resp.status_code != 200: | |
| print(f"β Failed to fetch English articles: {resp.status_code}") | |
| if resp.status_code == 404: | |
| print(f"π‘ Collection '{collection_name}' not found. Falling back to language filtering...") | |
| return self.fetch_articles(limit=limit, language="English") | |
| elif resp.status_code >= 500: | |
| print(f"π Server error {resp.status_code}, retrying...") | |
| time.sleep(2) | |
| continue | |
| else: | |
| print(f"π Falling back to language filtering due to error {resp.status_code}") | |
| return self.fetch_articles(limit=limit, language="English") | |
| data = resp.json() | |
| docs = data.get("documents", []) | |
| if not docs: | |
| print("π No more documents in English collection") | |
| break | |
| # Convert documents to NewsArticle objects | |
| batch_articles = [] | |
| for doc in docs: | |
| article = self._convert_english_doc(doc) | |
| if article: | |
| batch_articles.append(article) | |
| articles.extend(batch_articles) | |
| print(f"β Processed {len(batch_articles)} articles from batch") | |
| if remaining is not None: | |
| remaining -= len(docs) | |
| # Check for next page | |
| page_token = data.get("nextPageToken") | |
| if not page_token: | |
| break | |
| # Small delay to avoid rate limiting | |
| time.sleep(0.1) | |
| print(f"π― Successfully fetched {len(articles)} English articles") | |
| return articles | |
| except Exception as e: | |
| print(f"β Error fetching English articles: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Fallback to the old method | |
| print("π Falling back to language filtering method...") | |
| return self.fetch_articles(limit=limit, language="English") | |
| def _convert_english_doc(self, doc: Dict[str, Any]) -> Optional[NewsArticle]: | |
| """ | |
| Convert Firebase document from English articles collection to NewsArticle. | |
| Optimized for the specific structure of English articles. | |
| """ | |
| try: | |
| doc_name = doc.get("name", "") | |
| doc_id = doc_name.split("/")[-1] if doc_name else "unknown" | |
| fields = doc.get("fields", {}) | |
| # Extract field values with proper type handling | |
| data: Dict[str, Any] = {} | |
| for fname, fval in fields.items(): | |
| if fval and isinstance(fval, dict): | |
| # Handle different Firestore value types | |
| if "stringValue" in fval: | |
| data[fname] = fval["stringValue"] | |
| elif "integerValue" in fval: | |
| data[fname] = int(fval["integerValue"]) | |
| elif "doubleValue" in fval: | |
| data[fname] = float(fval["doubleValue"]) | |
| elif "timestampValue" in fval: | |
| data[fname] = fval["timestampValue"] | |
| elif "booleanValue" in fval: | |
| data[fname] = fval["booleanValue"] | |
| else: | |
| # Get the first available value type | |
| ftype = list(fval.keys())[0] | |
| data[fname] = fval[ftype] | |
| # Enhanced field mapping for English articles collection | |
| # Try multiple field name variations for content | |
| content_candidates = [ | |
| "content", "Content", "article_text", "Article_text", "articleText", | |
| "text", "Text", "body", "Body", "description", "Description", | |
| "summary", "Summary", "article_content", "articleContent", "full_text" | |
| ] | |
| content = "" | |
| content_field_used = None | |
| for candidate in content_candidates: | |
| if candidate in data and data[candidate]: | |
| content = str(data[candidate]).strip() | |
| content_field_used = candidate | |
| break | |
| # Try multiple field name variations for title | |
| title_candidates = [ | |
| "title", "Title", "headline", "Headline", "subject", "Subject", | |
| "name", "Name", "article_title", "articleTitle" | |
| ] | |
| title = "Untitled" | |
| for candidate in title_candidates: | |
| if candidate in data and data[candidate]: | |
| title = str(data[candidate]).strip() | |
| break | |
| # Try multiple field name variations for URL | |
| url_candidates = [ | |
| "url", "URL", "link", "Link", "href", "source_url", "sourceUrl", "web_url" | |
| ] | |
| url = f"firebase://english_articles/{doc_id}" | |
| for candidate in url_candidates: | |
| if candidate in data and data[candidate]: | |
| url_value = str(data[candidate]).strip() | |
| if url_value.startswith(('http://', 'https://')): | |
| url = url_value | |
| break | |
| # Source information | |
| source_candidates = ["source", "Source", "publisher", "Publisher", "site", "Site"] | |
| source = "English Articles Collection" | |
| for candidate in source_candidates: | |
| if candidate in data and data[candidate]: | |
| source = str(data[candidate]).strip() | |
| break | |
| # Date information | |
| date_candidates = [ | |
| "published_date", "publishedDate", "date", "Date", "created_at", "createdAt", | |
| "timestamp", "publish_time", "publication_date" | |
| ] | |
| published_date = datetime.now().isoformat() | |
| for candidate in date_candidates: | |
| if candidate in data and data[candidate]: | |
| published_date = str(data[candidate]) | |
| break | |
| # Quality check - ensure we have substantial content | |
| if len(content) < 100: | |
| print(f"β οΈ English article {doc_id[:8]}... has minimal content:") | |
| print(f" Content field '{content_field_used}': {len(content)} chars") | |
| print(f" Available fields: {list(data.keys())}") | |
| # Try to combine multiple fields if content is insufficient | |
| combined_content = [] | |
| if title and title != "Untitled": | |
| combined_content.append(f"Title: {title}") | |
| for field_name, field_value in data.items(): | |
| if (isinstance(field_value, str) and | |
| len(field_value) > 50 and | |
| field_name not in content_candidates[:3]): # Not already used | |
| combined_content.append(f"{field_name}: {field_value}") | |
| if combined_content: | |
| content = "\n\n".join(combined_content) | |
| print(f" π Combined content from multiple fields: {len(content)} chars") | |
| article = NewsArticle( | |
| title=title, | |
| content=content, | |
| url=url, | |
| source=source, | |
| published_date=published_date, | |
| scraped_date=data.get("scraped_date", data.get("scrapedAt", datetime.now().isoformat())), | |
| article_id=doc_id, | |
| ) | |
| # Add language marker (since these are from English collection) | |
| article.language = "english" # Match the JavaScript implementation | |
| return article | |
| except Exception as e: | |
| print(f"β οΈ Error converting English article {doc_id}: {e}") | |
| return None | |
| def _fetch_articles_with_filter(self, collection_name: str, limit: int, language: str) -> List[NewsArticle]: | |
| """Fetch articles using Firestore structured query with language filter.""" | |
| try: | |
| # Firestore structured query endpoint | |
| query_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents:runQuery" | |
| remaining = None if (limit is None or (isinstance(limit, int) and limit <= 0)) else int(limit) | |
| articles: List[NewsArticle] = [] | |
| # First, let's check what the data actually looks like | |
| print(f"π Analyzing Firebase data structure for language filtering...") | |
| # Get a small sample first to understand the data structure | |
| sample_query = { | |
| "structuredQuery": { | |
| "from": [{"collectionId": collection_name}], | |
| "limit": 3 | |
| } | |
| } | |
| headers = {'Content-Type': 'application/json'} | |
| params = {"key": self.api_key} | |
| sample_resp = requests.post(query_url, json=sample_query, headers=headers, params=params, timeout=30) | |
| if sample_resp.status_code == 200: | |
| sample_data = sample_resp.json() | |
| print(f"π Sample response contains {len(sample_data) if isinstance(sample_data, list) else 1} items") | |
| # Analyze the structure of the first document | |
| if isinstance(sample_data, list) and len(sample_data) > 0: | |
| first_item = sample_data[0] | |
| if "document" in first_item: | |
| doc = first_item["document"] | |
| if "fields" in doc: | |
| fields = doc["fields"] | |
| available_fields = list(fields.keys()) | |
| print(f"π Available fields: {available_fields}") | |
| # Check language field specifically | |
| if "language" in fields: | |
| lang_field = fields["language"] | |
| print(f"π€ Language field structure: {lang_field}") | |
| if "stringValue" in lang_field: | |
| print(f"π€ Language value: '{lang_field['stringValue']}'") | |
| else: | |
| print("β οΈ No 'language' field found! Looking for alternatives...") | |
| # Check for alternative language field names | |
| lang_candidates = [f for f in available_fields if 'lang' in f.lower()] | |
| if lang_candidates: | |
| print(f"π Possible language fields: {lang_candidates}") | |
| # Use the first candidate | |
| alt_field = lang_candidates[0] | |
| print(f"π Using '{alt_field}' as language field") | |
| language_field = alt_field | |
| else: | |
| print("β No language field found. Falling back to content analysis.") | |
| return self._fetch_with_content_filter(collection_name, limit, language) | |
| # Sample a few more documents to see language distribution | |
| lang_values = set() | |
| for item in sample_data: | |
| if "document" in item and "fields" in item["document"]: | |
| doc_fields = item["document"]["fields"] | |
| if "language" in doc_fields and "stringValue" in doc_fields["language"]: | |
| lang_values.add(doc_fields["language"]["stringValue"]) | |
| print(f"π Language values found in sample: {list(lang_values)}") | |
| elif isinstance(sample_data, dict) and "documents" in sample_data: | |
| # Different response format | |
| documents = sample_data["documents"] | |
| print(f"π Found {len(documents)} documents in response") | |
| if documents: | |
| first_doc = documents[0] | |
| if "fields" in first_doc: | |
| fields = first_doc["fields"] | |
| available_fields = list(fields.keys()) | |
| print(f"π Available fields: {available_fields}") | |
| else: | |
| print(f"β Sample query failed: {sample_resp.status_code}") | |
| # Continue anyway with best guess | |
| # Now try to query with language filter | |
| language_variants = [language, language.lower(), language.upper(), language.capitalize()] | |
| for lang_variant in language_variants: | |
| print(f"π Trying language filter: '{lang_variant}'") | |
| query_data = { | |
| "structuredQuery": { | |
| "from": [{"collectionId": collection_name}], | |
| "where": { | |
| "fieldFilter": { | |
| "field": {"fieldPath": "language"}, | |
| "op": "EQUAL", | |
| "value": {"stringValue": lang_variant} | |
| } | |
| }, | |
| "limit": min(remaining or 1000, 1000) | |
| } | |
| } | |
| resp = requests.post(query_url, json=query_data, headers=headers, params=params, timeout=30) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| if isinstance(data, list): | |
| filtered_count = len(data) | |
| print(f"π Found {filtered_count} articles with language='{lang_variant}'") | |
| if filtered_count > 0: | |
| # Process the results | |
| for result in data: | |
| if "document" in result: | |
| doc = result["document"] | |
| art = self._convert_doc(doc) | |
| if art: | |
| articles.append(art) | |
| elif "fields" in result: # Direct document format | |
| art = self._convert_doc(result) | |
| if art: | |
| articles.append(art) | |
| # If we got good results, continue with this variant | |
| if len(articles) >= 5: # Lower threshold | |
| print(f"β Using language variant '{lang_variant}' - found {len(articles)} articles") | |
| break | |
| elif isinstance(data, dict) and "documents" in data: | |
| # Alternative response format | |
| documents = data["documents"] | |
| filtered_count = len(documents) | |
| print(f"π Found {filtered_count} documents with language='{lang_variant}'") | |
| if filtered_count > 0: | |
| for doc in documents: | |
| art = self._convert_doc(doc) | |
| if art: | |
| articles.append(art) | |
| if len(articles) >= 5: | |
| print(f"β Using language variant '{lang_variant}' - found {len(articles)} articles") | |
| break | |
| else: | |
| print(f"β Query failed for '{lang_variant}': {resp.status_code}") | |
| time.sleep(0.2) # Small delay between attempts | |
| # If we still don't have enough articles, fall back to content filtering | |
| if len(articles) < 100: | |
| print(f"β οΈ Only found {len(articles)} articles with language filter. Trying content-based filtering...") | |
| fallback_articles = self._fetch_with_content_filter(collection_name, remaining or 1000, language) | |
| # Merge results, avoiding duplicates | |
| existing_ids = {art.article_id for art in articles} | |
| for art in fallback_articles: | |
| if art.article_id not in existing_ids: | |
| articles.append(art) | |
| if remaining and len(articles) >= remaining: | |
| break | |
| print(f"β Fetched {len(articles)} {language} articles from Firebase") | |
| return articles[:remaining] if remaining else articles | |
| except Exception as e: | |
| print(f"β Error in filtered fetch: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Fallback to simple fetch | |
| return self._fetch_articles_simple(collection_name, limit) | |
| def _fetch_with_content_filter(self, collection_name: str, limit: int, language: str) -> List[NewsArticle]: | |
| """Fetch articles and filter by content analysis (fallback method).""" | |
| print(f"π Fetching articles and filtering by content for {language}...") | |
| # Fetch more articles to filter from | |
| raw_articles = self._fetch_articles_simple(collection_name, min(2000, limit * 3)) | |
| filtered_articles = [] | |
| for article in raw_articles: | |
| if self._is_likely_language(article.content, language): | |
| filtered_articles.append(article) | |
| if len(filtered_articles) >= limit: | |
| break | |
| print(f"π Content filtering: {len(filtered_articles)} {language} articles from {len(raw_articles)} total") | |
| return filtered_articles | |
| def _is_likely_language(self, text: str, target_language: str) -> bool: | |
| """Simple heuristic to check if text is likely in the target language.""" | |
| if not text or len(text) < 50: | |
| return False | |
| if target_language.lower() in ["english", "en"]: | |
| return self._is_likely_english(text) | |
| # For other languages, we'll need different heuristics | |
| # For now, default to True | |
| return True | |
| def _is_likely_english(self, text: str) -> bool: | |
| """Simple heuristic to check if text is likely English.""" | |
| if not text or len(text) < 50: | |
| return False | |
| # Common English words and patterns | |
| english_indicators = { | |
| 'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i', 'it', 'for', 'not', 'on', 'with', | |
| 'he', 'as', 'you', 'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we', 'say', 'her', | |
| 'she', 'or', 'an', 'will', 'my', 'one', 'all', 'would', 'there', 'their', 'what', 'so', 'up', | |
| 'out', 'if', 'about', 'who', 'get', 'which', 'go', 'me', 'when', 'make', 'can', 'like', 'time', | |
| 'security', 'vulnerability', 'attack', 'system', 'software', 'data', 'network', 'computer', | |
| 'application', 'server', 'database', 'information', 'technology', 'cyber', 'malware', 'breach' | |
| } | |
| # Convert to lowercase and split into words | |
| words = text.lower().replace(',', ' ').replace('.', ' ').split()[:100] # Check first 100 words | |
| if len(words) < 10: | |
| return False | |
| # Count English indicators | |
| english_count = 0 | |
| for word in words: | |
| # Remove punctuation for matching | |
| clean_word = ''.join(c for c in word if c.isalnum()) | |
| if clean_word in english_indicators: | |
| english_count += 1 | |
| ratio = english_count / len(words) | |
| return ratio > 0.15 # At least 15% English indicators | |
| def _fetch_articles_simple(self, collection_name: str, limit: int) -> List[NewsArticle]: | |
| """Original simple fetch method without filtering.""" | |
| try: | |
| base_url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" | |
| remaining = None if (limit is None or (isinstance(limit, int) and limit <= 0)) else int(limit) | |
| page_token: Optional[str] = None | |
| batch_size = min(100, 300) # Smaller batch size to avoid rate limiting | |
| articles: List[NewsArticle] = [] | |
| request_count = 0 | |
| max_requests = 50 # Limit total requests to avoid rate limiting | |
| while True: | |
| if remaining is not None and remaining <= 0: | |
| break | |
| if request_count >= max_requests: | |
| print(f"β³ Reached max requests limit ({max_requests}), stopping to avoid rate limits") | |
| break | |
| page_size = batch_size if remaining is None else min(batch_size, remaining) | |
| params = {"key": self.api_key, "pageSize": page_size} | |
| if page_token: | |
| params["pageToken"] = page_token | |
| # Add delay between requests to avoid rate limiting | |
| if request_count > 0: | |
| time.sleep(0.2) # 200ms delay between requests | |
| resp = requests.get(base_url, params=params, timeout=30) | |
| request_count += 1 | |
| if resp.status_code == 429: # Rate limit | |
| retry_after = int(resp.headers.get('Retry-After', 60)) | |
| print(f"β Firebase API rate limited: waiting {retry_after}s") | |
| time.sleep(retry_after) | |
| continue | |
| elif resp.status_code != 200: | |
| print(f"β Firebase API failed: {resp.status_code}") | |
| if resp.status_code >= 500: # Server error, might be temporary | |
| time.sleep(5) | |
| continue | |
| break | |
| data = resp.json() | |
| docs = data.get("documents", []) | |
| if not docs: | |
| break | |
| for d in docs: | |
| art = self._convert_doc(d) | |
| if art: | |
| articles.append(art) | |
| if remaining is not None: | |
| remaining -= len(docs) | |
| page_token = data.get("nextPageToken") | |
| if not page_token: | |
| break | |
| return articles | |
| except Exception as e: | |
| print(f"β Firebase error: {e}") | |
| return [] | |
| def _convert_doc(self, doc: Dict[str, Any]) -> Optional[NewsArticle]: | |
| """Convert Firebase document to NewsArticle with improved field mapping.""" | |
| try: | |
| doc_name = doc.get("name", "") | |
| doc_id = doc_name.split("/")[-1] if doc_name else "unknown" | |
| fields = doc.get("fields", {}) | |
| # Extract field values with better handling | |
| data: Dict[str, Any] = {} | |
| for fname, fval in fields.items(): | |
| if fval and isinstance(fval, dict): | |
| # Handle different Firestore value types | |
| if "stringValue" in fval: | |
| data[fname] = fval["stringValue"] | |
| elif "integerValue" in fval: | |
| data[fname] = fval["integerValue"] | |
| elif "doubleValue" in fval: | |
| data[fname] = fval["doubleValue"] | |
| elif "timestampValue" in fval: | |
| data[fname] = fval["timestampValue"] | |
| elif "booleanValue" in fval: | |
| data[fname] = fval["booleanValue"] | |
| else: | |
| # Get the first available value type | |
| ftype = list(fval.keys())[0] | |
| data[fname] = fval[ftype] | |
| # Try multiple field name variations for content | |
| content_candidates = [ | |
| "Article_text", "article_text", "content", "Content", | |
| "text", "Text", "body", "Body", "description", "Description", | |
| "summary", "Summary", "article_content", "articleContent" | |
| ] | |
| content = "" | |
| content_field = None | |
| for candidate in content_candidates: | |
| if candidate in data and data[candidate]: | |
| content = str(data[candidate]).strip() | |
| content_field = candidate | |
| break | |
| # Try multiple field name variations for title | |
| title_candidates = [ | |
| "Title", "title", "headline", "Headline", "subject", "Subject", "name", "Name" | |
| ] | |
| title = "Untitled" | |
| for candidate in title_candidates: | |
| if candidate in data and data[candidate]: | |
| title = str(data[candidate]).strip() | |
| break | |
| # Try multiple field name variations for URL | |
| url_candidates = [ | |
| "URL", "url", "link", "Link", "href", "source_url", "sourceUrl" | |
| ] | |
| url = f"firebase://doc/{doc_id}" | |
| for candidate in url_candidates: | |
| if candidate in data and data[candidate]: | |
| url = str(data[candidate]).strip() | |
| break | |
| # Debug output for empty content | |
| if not content or len(content) < 50: | |
| available_fields = list(data.keys()) | |
| print(f"β οΈ Article {doc_id[:8]}... has minimal content:") | |
| print(f" Content field '{content_field}': {len(content)} chars") | |
| print(f" Available fields: {available_fields}") | |
| print(f" Sample data: {str(data)[:200]}...") | |
| article = NewsArticle( | |
| title=title, | |
| content=content, | |
| url=url, | |
| source=data.get("source", data.get("Source", "Firebase")), | |
| published_date=data.get("Date", data.get("date", data.get("published_date", data.get("createdAt", datetime.now().isoformat())))), | |
| scraped_date=data.get("scrapedAt", data.get("scraped_date", data.get("createdAt", datetime.now().isoformat()))), | |
| article_id=doc_id, | |
| ) | |
| return article | |
| except Exception as e: | |
| print(f"β οΈ Document conversion error for {doc_id}: {e}") | |
| return None | |
| def load_news_articles(self, collection_name: str = "Articles", limit: int = 100) -> List[NewsArticle]: | |
| return self.fetch_articles(collection_name, limit) | |
| def analyze_schema(self, collection_name: str = "Articles") -> Dict[str, Any]: | |
| try: | |
| url = f"https://firestore.googleapis.com/v1/projects/{self.project_id}/databases/(default)/documents/{collection_name}" | |
| params = {"key": self.api_key, "pageSize": 5} | |
| response = requests.get(url, params=params, timeout=30) | |
| if response.status_code == 200: | |
| data = response.json() | |
| documents = data.get("documents", []) | |
| if not documents: | |
| return {"error": "empty", "collection": collection_name} | |
| all_fields = set() | |
| sample_data = [] | |
| for doc in documents: | |
| fields = doc.get("fields", {}) | |
| field_names = list(fields.keys()) | |
| all_fields.update(field_names) | |
| sample_values: Dict[str, Any] = {} | |
| for fname, fdata in fields.items(): | |
| if fdata and isinstance(fdata, dict): | |
| ftype = list(fdata.keys())[0] | |
| sample_values[fname] = str(fdata[ftype])[:100] | |
| doc_id = doc.get("name", "").split("/")[-1] | |
| sample_data.append({"id": doc_id, "fields": field_names, "sample": sample_values}) | |
| return { | |
| "collection": collection_name, | |
| "document_count": len(documents), | |
| "unique_fields": sorted(list(all_fields)), | |
| "field_count": len(all_fields), | |
| "sample_documents": sample_data, | |
| } | |
| return {"error": f"status {response.status_code}", "collection": collection_name} | |
| except Exception as e: | |
| return {"error": str(e), "collection": collection_name} | |
| def get_collections_info(self) -> List[Dict[str, Any]]: | |
| possible = ["Articles", "articles"] | |
| results: List[Dict[str, Any]] = [] | |
| seen = set() | |
| for name in possible: | |
| if name in seen: | |
| continue | |
| arts = self.fetch_articles(name, limit=5) | |
| if arts: | |
| results.append({ | |
| "name": name, | |
| "document_count": "β₯" + str(len(arts)), | |
| "sample_titles": [a.title for a in arts[:3]], | |
| }) | |
| seen.add(name) | |
| if not results: | |
| results.append({"name": "Articles", "document_count": 0}) | |
| return results | |