Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Comprehensive diagnostic tool to trace all components of the CVE Fact Checker system. | |
| This will identify issues in data fetching, chunking, embeddings, vector store, and retrieval. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Optional | |
| # Add the parent directory to Python path | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| sys.path.insert(0, current_dir) | |
| class CVEFactCheckerDiagnostic: | |
| def __init__(self): | |
| self.results = { | |
| "timestamp": datetime.now().isoformat(), | |
| "environment": self._get_environment_info(), | |
| "components": {} | |
| } | |
| def _get_environment_info(self) -> Dict[str, Any]: | |
| """Get environment information.""" | |
| return { | |
| "python_version": sys.version, | |
| "working_directory": os.getcwd(), | |
| "environment_vars": { | |
| "AUTO_INGEST": os.environ.get("AUTO_INGEST", "not_set"), | |
| "LANGUAGE_FILTER": os.environ.get("LANGUAGE_FILTER", "not_set"), | |
| "FIREBASE_API_KEY": "set" if os.environ.get("FIREBASE_API_KEY") else "not_set", | |
| "HF_HOME": os.environ.get("HF_HOME", "not_set"), | |
| "TRANSFORMERS_CACHE": os.environ.get("TRANSFORMERS_CACHE", "not_set"), | |
| }, | |
| "file_system": { | |
| "/tmp": os.path.exists("/tmp"), | |
| "/data": os.path.exists("/data"), | |
| "/app": os.path.exists("/app"), | |
| } | |
| } | |
| def diagnose_firebase_connection(self) -> Dict[str, Any]: | |
| """Diagnose Firebase connection and data fetching.""" | |
| print("π Diagnosing Firebase Connection...") | |
| result = {"status": "unknown", "errors": [], "data": {}} | |
| try: | |
| from cve_factchecker.firebase_loader import FirebaseNewsLoader, FirebaseConfig | |
| # Test Firebase configuration | |
| loader = FirebaseNewsLoader() | |
| result["data"]["project_id"] = loader.project_id | |
| result["data"]["api_key_length"] = len(loader.api_key) if loader.api_key else 0 | |
| # Test basic connectivity | |
| print(" Testing basic Firebase connectivity...") | |
| try: | |
| # Test with minimal fetch | |
| articles = loader.fetch_english_articles(limit=1) | |
| result["data"]["connectivity"] = "success" | |
| result["data"]["test_fetch_count"] = len(articles) | |
| if articles: | |
| sample = articles[0] | |
| result["data"]["sample_article"] = { | |
| "title": sample.title[:50] + "..." if len(sample.title) > 50 else sample.title, | |
| "content_length": len(sample.content), | |
| "has_url": bool(sample.url), | |
| "language": getattr(sample, 'language', 'unknown') | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Firebase connectivity failed: {e}") | |
| result["data"]["connectivity"] = "failed" | |
| # Test larger fetch | |
| print(" Testing larger data fetch...") | |
| try: | |
| start_time = time.time() | |
| articles = loader.fetch_english_articles(limit=10) | |
| fetch_time = time.time() - start_time | |
| result["data"]["larger_fetch"] = { | |
| "count": len(articles), | |
| "time_seconds": round(fetch_time, 2), | |
| "avg_time_per_article": round(fetch_time / max(len(articles), 1), 3) | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Larger fetch failed: {e}") | |
| # Test collection accessibility | |
| print(" Testing collection configurations...") | |
| config = FirebaseConfig( | |
| api_key="test", auth_domain="test", project_id="test", | |
| storage_bucket="test", messaging_sender_id="test", app_id="test" | |
| ) | |
| result["data"]["collections"] = { | |
| "articles_collection": config.ARTICLES_COLLECTION, | |
| "english_articles_collection": config.ENGLISH_ARTICLES_COLLECTION | |
| } | |
| result["status"] = "success" if not result["errors"] else "partial" | |
| except ImportError as e: | |
| result["errors"].append(f"Import error: {e}") | |
| result["status"] = "failed" | |
| except Exception as e: | |
| result["errors"].append(f"Unexpected error: {e}") | |
| result["status"] = "failed" | |
| return result | |
| def diagnose_chunking_and_embeddings(self) -> Dict[str, Any]: | |
| """Diagnose chunking strategy and embeddings generation.""" | |
| print("π Diagnosing Chunking and Embeddings...") | |
| result = {"status": "unknown", "errors": [], "data": {}} | |
| try: | |
| from cve_factchecker.retriever import VectorNewsRetriever | |
| from cve_factchecker.embeddings import build_embeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from cve_factchecker.firebase_loader import FirebaseNewsLoader | |
| # Test embeddings | |
| print(" Testing embeddings generation...") | |
| try: | |
| embeddings = build_embeddings() | |
| test_text = "This is a test sentence for embedding generation." | |
| start_time = time.time() | |
| test_embedding = embeddings.embed_query(test_text) | |
| embedding_time = time.time() - start_time | |
| result["data"]["embeddings"] = { | |
| "model_loaded": True, | |
| "embedding_dimension": len(test_embedding), | |
| "generation_time_seconds": round(embedding_time, 3), | |
| "sample_embedding_preview": test_embedding[:5] # First 5 values | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Embeddings failed: {e}") | |
| result["data"]["embeddings"] = {"model_loaded": False} | |
| # Test chunking strategy | |
| print(" Testing chunking strategy...") | |
| try: | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| # Get test content | |
| loader = FirebaseNewsLoader() | |
| articles = loader.fetch_english_articles(limit=1) | |
| if articles: | |
| test_article = articles[0] | |
| chunks = splitter.split_text(test_article.content) | |
| result["data"]["chunking"] = { | |
| "strategy": "RecursiveCharacterTextSplitter", | |
| "chunk_size": 1000, | |
| "chunk_overlap": 200, | |
| "test_article_length": len(test_article.content), | |
| "chunks_created": len(chunks), | |
| "chunk_lengths": [len(chunk) for chunk in chunks], | |
| "avg_chunk_length": sum(len(chunk) for chunk in chunks) / len(chunks) if chunks else 0 | |
| } | |
| else: | |
| result["errors"].append("No test articles available for chunking test") | |
| except Exception as e: | |
| result["errors"].append(f"Chunking test failed: {e}") | |
| result["status"] = "success" if not result["errors"] else "partial" | |
| except ImportError as e: | |
| result["errors"].append(f"Import error: {e}") | |
| result["status"] = "failed" | |
| except Exception as e: | |
| result["errors"].append(f"Unexpected error: {e}") | |
| result["status"] = "failed" | |
| return result | |
| def diagnose_vector_store(self) -> Dict[str, Any]: | |
| """Diagnose vector store operations and persistence.""" | |
| print("π Diagnosing Vector Store...") | |
| result = {"status": "unknown", "errors": [], "data": {}} | |
| try: | |
| from cve_factchecker.retriever import VectorNewsRetriever | |
| from cve_factchecker.firebase_loader import FirebaseNewsLoader | |
| # Test vector store initialization | |
| print(" Testing vector store initialization...") | |
| try: | |
| retriever = VectorNewsRetriever() | |
| result["data"]["vector_store"] = { | |
| "persist_directory": retriever.persist_directory, | |
| "initialization": "success" | |
| } | |
| # Check current document count | |
| try: | |
| # Try to get document count | |
| search_results = retriever.semantic_search("test", k=1) | |
| result["data"]["current_documents"] = len(search_results) if search_results else 0 | |
| except: | |
| result["data"]["current_documents"] = "unknown" | |
| except Exception as e: | |
| result["errors"].append(f"Vector store initialization failed: {e}") | |
| result["data"]["vector_store"] = {"initialization": "failed"} | |
| # Test complete ingestion process | |
| print(" Testing complete ingestion process...") | |
| try: | |
| loader = FirebaseNewsLoader() | |
| articles = loader.fetch_english_articles(limit=3) | |
| if articles: | |
| start_time = time.time() | |
| retriever.store_articles_in_vector_db(articles, clear_first=True) | |
| ingestion_time = time.time() - start_time | |
| # Test search after ingestion | |
| search_results = retriever.semantic_search("test", k=2) | |
| result["data"]["ingestion_test"] = { | |
| "articles_processed": len(articles), | |
| "ingestion_time_seconds": round(ingestion_time, 3), | |
| "searchable_chunks": len(search_results), | |
| "ingestion_success": len(search_results) > 0 | |
| } | |
| if search_results: | |
| sample_result = search_results[0] | |
| result["data"]["sample_search_result"] = { | |
| "title": sample_result.get("title", "")[:50] + "...", | |
| "content_length": len(sample_result.get("content", "")), | |
| "has_url": bool(sample_result.get("url")), | |
| "metadata_keys": list(sample_result.get("metadata", {}).keys()) | |
| } | |
| else: | |
| result["errors"].append("No articles available for ingestion test") | |
| except Exception as e: | |
| result["errors"].append(f"Ingestion test failed: {e}") | |
| # Test persistence | |
| print(" Testing vector store persistence...") | |
| try: | |
| persist_dir = result["data"]["vector_store"]["persist_directory"] | |
| if persist_dir and os.path.exists(persist_dir): | |
| files = os.listdir(persist_dir) | |
| result["data"]["persistence"] = { | |
| "directory_exists": True, | |
| "files_count": len(files), | |
| "files": files[:10] # First 10 files | |
| } | |
| else: | |
| result["data"]["persistence"] = { | |
| "directory_exists": False, | |
| "using_memory_store": True | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Persistence check failed: {e}") | |
| result["status"] = "success" if not result["errors"] else "partial" | |
| except ImportError as e: | |
| result["errors"].append(f"Import error: {e}") | |
| result["status"] = "failed" | |
| except Exception as e: | |
| result["errors"].append(f"Unexpected error: {e}") | |
| result["status"] = "failed" | |
| return result | |
| def diagnose_fact_checking_pipeline(self) -> Dict[str, Any]: | |
| """Diagnose the complete fact-checking pipeline.""" | |
| print("π Diagnosing Fact-Checking Pipeline...") | |
| result = {"status": "unknown", "errors": [], "data": {}} | |
| try: | |
| from cve_factchecker.orchestrator import FactCheckSystem | |
| from cve_factchecker.config import load_openrouter_config | |
| # Test system initialization | |
| print(" Testing system initialization...") | |
| try: | |
| system = FactCheckSystem() | |
| result["data"]["system_initialization"] = "success" | |
| # Test configuration | |
| config = load_openrouter_config() | |
| result["data"]["config"] = { | |
| "has_api_key": bool(config.api_key), | |
| "model": config.model, | |
| "max_tokens": config.max_tokens, | |
| "temperature": config.temperature | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"System initialization failed: {e}") | |
| result["data"]["system_initialization"] = "failed" | |
| return result | |
| # Test ingestion | |
| print(" Testing Firebase ingestion...") | |
| try: | |
| start_time = time.time() | |
| ingest_result = system.ingest_firebase( | |
| collection="english_articles", | |
| limit=5, | |
| language="English" | |
| ) | |
| ingest_time = time.time() - start_time | |
| result["data"]["ingestion"] = { | |
| "success": ingest_result.get("success", False), | |
| "synced_count": ingest_result.get("synced", 0), | |
| "time_seconds": round(ingest_time, 3), | |
| "collection": ingest_result.get("collection"), | |
| "error": ingest_result.get("error") | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Ingestion test failed: {e}") | |
| # Test fact-checking | |
| print(" Testing fact-checking process...") | |
| try: | |
| test_claim = "Security researchers discovered a new vulnerability" | |
| start_time = time.time() | |
| fact_check_result = system.fact_check(test_claim) | |
| fact_check_time = time.time() - start_time | |
| result["data"]["fact_checking"] = { | |
| "test_claim": test_claim, | |
| "verdict": fact_check_result.get("verdict"), | |
| "confidence": fact_check_result.get("confidence"), | |
| "reasoning_length": len(fact_check_result.get("reasoning", "")), | |
| "sources_used": fact_check_result.get("sources_used", 0), | |
| "time_seconds": round(fact_check_time, 3), | |
| "has_sources": len(fact_check_result.get("retrieved_articles", [])) > 0 | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Fact-checking test failed: {e}") | |
| result["status"] = "success" if not result["errors"] else "partial" | |
| except ImportError as e: | |
| result["errors"].append(f"Import error: {e}") | |
| result["status"] = "failed" | |
| except Exception as e: | |
| result["errors"].append(f"Unexpected error: {e}") | |
| result["status"] = "failed" | |
| return result | |
| def diagnose_background_ingestion(self) -> Dict[str, Any]: | |
| """Diagnose background ingestion issues.""" | |
| print("π Diagnosing Background Ingestion...") | |
| result = {"status": "unknown", "errors": [], "data": {}} | |
| try: | |
| # Check lock file issues | |
| lock_file = "/tmp/ingest.lock" if os.name != 'nt' else "ingest.lock" | |
| result["data"]["lock_file"] = { | |
| "path": lock_file, | |
| "exists": os.path.exists(lock_file), | |
| "can_write_tmp": os.access("/tmp", os.W_OK) if os.path.exists("/tmp") else False | |
| } | |
| # Test lock mechanisms | |
| if os.path.exists(lock_file): | |
| try: | |
| with open(lock_file, 'r') as f: | |
| lock_content = f.read() | |
| result["data"]["lock_content"] = lock_content | |
| except: | |
| result["data"]["lock_content"] = "unreadable" | |
| # Test environment variables | |
| result["data"]["environment"] = { | |
| "AUTO_INGEST": os.environ.get("AUTO_INGEST", "not_set"), | |
| "WERKZEUG_RUN_MAIN": os.environ.get("WERKZEUG_RUN_MAIN", "not_set"), | |
| } | |
| # Test threading | |
| try: | |
| import threading | |
| result["data"]["threading"] = { | |
| "active_threads": threading.active_count(), | |
| "thread_names": [t.name for t in threading.enumerate()] | |
| } | |
| except Exception as e: | |
| result["errors"].append(f"Threading check failed: {e}") | |
| result["status"] = "success" if not result["errors"] else "partial" | |
| except Exception as e: | |
| result["errors"].append(f"Background ingestion diagnosis failed: {e}") | |
| result["status"] = "failed" | |
| return result | |
| def run_complete_diagnosis(self) -> Dict[str, Any]: | |
| """Run complete system diagnosis.""" | |
| print("π₯ CVE Fact Checker - Complete System Diagnosis") | |
| print("=" * 80) | |
| # Run all diagnostic components | |
| self.results["components"]["firebase"] = self.diagnose_firebase_connection() | |
| self.results["components"]["chunking_embeddings"] = self.diagnose_chunking_and_embeddings() | |
| self.results["components"]["vector_store"] = self.diagnose_vector_store() | |
| self.results["components"]["fact_checking"] = self.diagnose_fact_checking_pipeline() | |
| self.results["components"]["background_ingestion"] = self.diagnose_background_ingestion() | |
| # Calculate overall status | |
| component_statuses = [comp["status"] for comp in self.results["components"].values()] | |
| if all(status == "success" for status in component_statuses): | |
| self.results["overall_status"] = "healthy" | |
| elif any(status == "success" for status in component_statuses): | |
| self.results["overall_status"] = "partial" | |
| else: | |
| self.results["overall_status"] = "critical" | |
| return self.results | |
| def print_summary(self): | |
| """Print a human-readable summary of the diagnosis.""" | |
| print("\nπ Diagnosis Summary") | |
| print("=" * 50) | |
| overall = self.results.get("overall_status", "unknown") | |
| print(f"Overall Status: {overall.upper()}") | |
| for component, data in self.results["components"].items(): | |
| status = data["status"] | |
| errors = len(data["errors"]) | |
| icon = "β " if status == "success" else "β οΈ" if status == "partial" else "β" | |
| print(f"{icon} {component.replace('_', ' ').title()}: {status} ({errors} errors)") | |
| # Print critical errors | |
| all_errors = [] | |
| for component, data in self.results["components"].items(): | |
| for error in data["errors"]: | |
| all_errors.append(f"{component}: {error}") | |
| if all_errors: | |
| print(f"\nπ¨ Critical Issues Found:") | |
| for error in all_errors[:10]: # Show first 10 errors | |
| print(f" β’ {error}") | |
| if len(all_errors) > 10: | |
| print(f" ... and {len(all_errors) - 10} more") | |
| def save_report(self, filename: str = "diagnosis_report.json"): | |
| """Save detailed diagnosis report to file.""" | |
| try: | |
| with open(filename, 'w') as f: | |
| json.dump(self.results, f, indent=2, default=str) | |
| print(f"π Detailed report saved to: {filename}") | |
| except Exception as e: | |
| print(f"β Could not save report: {e}") | |
| def main(): | |
| """Main diagnostic function.""" | |
| print("π₯ CVE Fact Checker - Complete System Diagnostic") | |
| print("=" * 80) | |
| diagnostic = CVEFactCheckerDiagnostic() | |
| results = diagnostic.run_complete_diagnosis() | |
| diagnostic.print_summary() | |
| diagnostic.save_report() | |
| return results["overall_status"] in ["healthy", "partial"] | |
| if __name__ == "__main__": | |
| success = main() | |
| sys.exit(0 if success else 1) |