from __future__ import annotations import os import math import re import time from typing import List, Dict, Any, Tuple, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document try: from langchain_chroma import Chroma # type: ignore except Exception: from langchain_community.vectorstores import Chroma # type: ignore from .embeddings import build_embeddings from .models import NewsArticle class VectorNewsRetriever: def __init__(self, persist_directory: str = "./vector_db"): # Allow overriding persist directory via env (e.g., /data on Hugging Face Spaces) env_dir = os.environ.get("VECTOR_PERSIST_DIR") # For containerized environments, use appropriate writable directories if not env_dir: # Check for writable directories in order of preference candidates = ["/data/vector_db", "/tmp/vector_db", "./vector_db"] for candidate in candidates: try: # Test if we can create the directory test_dir = os.path.dirname(candidate) if not os.path.exists(test_dir): os.makedirs(test_dir, exist_ok=True) # Test write permission test_file = os.path.join(test_dir, "test_write") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) persist_directory = candidate break except (OSError, PermissionError): continue else: # If all fail, use in-memory store persist_directory = None self.persist_directory = env_dir or persist_directory self.embeddings = build_embeddings() self.vector_store = self._initialize_vector_store() # Lightweight in-memory keyword index (lazy-built) self._index_ready: bool = False self._index_built_at: float = 0.0 self._N: int = 0 # number of docs self._avgdl: float = 0.0 self._df: Dict[str, int] = {} self._postings: Dict[str, Dict[str, int]] = {} # term -> {doc_id: tf} self._doc_len: Dict[str, int] = {} self._doc_meta: Dict[str, Dict[str, Any]] = {} # id -> {content, metadata} self._stopwords = set( "the a an and or of to in on for from by with without at as is are was were be been being this that those these it its their his her you your we our not no over under into about across more most least few many much may might should would could will can https http www com pk net org www.".split() ) def _initialize_vector_store(self) -> Chroma: """Initialize vector store with proper error handling for permission issues.""" # If no persist directory (failed all write tests), use in-memory if self.persist_directory is None: print("🔄 Using in-memory vector store (no writable directory found)") return Chroma(embedding_function=self.embeddings, collection_name="news_articles_memory") try: os.makedirs(self.persist_directory, exist_ok=True) vs = Chroma( persist_directory=self.persist_directory, embedding_function=self.embeddings, collection_name="news_articles" ) try: count = vs._collection.count() print(f"✅ Loaded vector database with {count} documents from {self.persist_directory}") except Exception: print(f"✅ Vector database loaded from {self.persist_directory}") return vs except (PermissionError, OSError) as e: print(f"❌ Error initializing vector store at {self.persist_directory}: {e}") print("🔄 Using in-memory store") return Chroma(embedding_function=self.embeddings, collection_name="news_articles_memory") except Exception as e: print(f"❌ Unexpected error initializing vector store: {e}") print("🔄 Using in-memory store") return Chroma(embedding_function=self.embeddings, collection_name="news_articles_memory") def get_vector_count(self) -> int: """Get the current number of documents in the vector store.""" try: if hasattr(self.vector_store, '_collection'): return self.vector_store._collection.count() else: # Fallback method return len(self.vector_store.get()['ids']) except Exception as e: print(f"âš ī¸ Could not get vector count: {e}") return 0 def clear_vector_store(self) -> None: """Clear all documents from the vector store to start fresh.""" try: # Delete the collection to start fresh self.vector_store.delete_collection() print("đŸ—‘ī¸ Cleared existing vector store") # Reinitialize the vector store self.vector_store = self._initialize_vector_store() except Exception as e: print(f"âš ī¸ Could not clear vector store: {e}") # Fallback: create new in-memory store self.vector_store = Chroma(embedding_function=self.embeddings, collection_name="news_articles_fresh") # Invalidate keyword index after clear self._index_ready = False self._df.clear(); self._postings.clear(); self._doc_len.clear(); self._doc_meta.clear() self._N = 0; self._avgdl = 0.0 def store_articles_in_vector_db(self, articles: List[NewsArticle], clear_first: bool = False) -> None: if not articles: print("â„šī¸ No new articles to store") return if clear_first: self.clear_vector_store() splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) docs: List[Document] = [] # Process articles and filter out those with insufficient content valid_articles = 0 skipped_articles = 0 for art in articles: content = art.content or "" title = art.title or "Untitled" # Skip articles with very little content if len(content.strip()) < 50: print(f"âš ī¸ Skipping article '{title[:50]}...' - insufficient content ({len(content)} chars)") skipped_articles += 1 continue # Create chunks from the content try: chunks = splitter.split_text(content) if not chunks: print(f"âš ī¸ No chunks created for article '{title[:50]}...'") skipped_articles += 1 continue # Create documents for each chunk for i, chunk in enumerate(chunks): if len(chunk.strip()) < 30: # Skip very small chunks continue # Create comprehensive page content page_content = f"Title: {title}\n\n{chunk}" # Add source information if it's not already in the content if art.source and art.source not in chunk: page_content += f"\n\nSource: {art.source}" metadata = { "url": art.url, "source": art.source, "published_date": art.published_date, "scraped_date": art.scraped_date, "id": art.article_id, "chunk_id": f"{art.article_id}_{i}", "title": title } docs.append(Document(page_content=page_content, metadata=metadata)) valid_articles += 1 except Exception as e: print(f"❌ Error processing article '{title[:50]}...': {e}") skipped_articles += 1 continue print(f"📊 Article processing summary:") print(f" Total articles: {len(articles)}") print(f" Valid articles: {valid_articles}") print(f" Skipped articles: {skipped_articles}") print(f" Generated chunks: {len(docs)}") if not docs: print("❌ No valid document chunks to store!") return # Process documents in batches to avoid Chroma batch size limits batch_size = 4000 # Conservative batch size for Chroma total_docs = len(docs) print(f"đŸ“Ļ Processing {total_docs} document chunks in batches of {batch_size}") for i in range(0, total_docs, batch_size): batch_docs = docs[i:i + batch_size] batch_num = (i // batch_size) + 1 total_batches = (total_docs + batch_size - 1) // batch_size print(f"💾 Storing batch {batch_num}/{total_batches} ({len(batch_docs)} docs)") try: if hasattr(self.vector_store, "add_documents"): self.vector_store.add_documents(batch_docs) else: self.vector_store.add_texts([d.page_content for d in batch_docs], metadatas=[d.metadata for d in batch_docs]) print(f"✅ Successfully stored batch {batch_num}") except Exception as e: print(f"❌ Failed to store batch {batch_num}: {e}") # Continue with next batch instead of failing completely continue continue continue try: # Only try to persist if we have a writable directory if self.persist_directory and hasattr(self.vector_store, 'persist'): self.vector_store.persist() except Exception as e: print(f"âš ī¸ Could not persist vector store: {e}") print(f"✅ Stored {len(docs)} chunks from {len(articles)} articles") # Invalidate index so it is rebuilt on next query self._index_ready = False self._df.clear(); self._postings.clear(); self._doc_len.clear(); self._doc_meta.clear() self._N = 0; self._avgdl = 0.0 # ----------------------------- # Hybrid Retrieval Implementation # ----------------------------- def _tokenize(self, text: str) -> List[str]: text = text.lower() # Keep alphanumerics as tokens tokens = re.split(r"[^a-z0-9]+", text) return [t for t in tokens if t and t not in self._stopwords and not t.isdigit()] def _ensure_index(self) -> None: if self._index_ready: return try: # Prefer direct collection access for efficiency docs_data: Optional[Dict[str, Any]] = None if hasattr(self.vector_store, "_collection") and self.vector_store._collection is not None: # type: ignore[attr-defined] try: docs_data = self.vector_store._collection.get(include=["ids", "documents", "metadatas"]) # type: ignore[attr-defined] except Exception as e: print(f"âš ī¸ Could not read collection directly: {e}") if docs_data is None: try: docs_data = self.vector_store.get() except Exception as e: print(f"âš ī¸ Could not fetch documents for index: {e}") self._index_ready = False return ids = docs_data.get("ids", []) or [] documents = docs_data.get("documents", []) or [] metadatas = docs_data.get("metadatas", []) or [] N = len(ids) if N == 0: self._index_ready = True self._N = 0 self._avgdl = 0.0 return df: Dict[str, int] = {} postings: Dict[str, Dict[str, int]] = {} doc_len: Dict[str, int] = {} doc_meta: Dict[str, Dict[str, Any]] = {} total_len = 0 for doc_id, content, meta in zip(ids, documents, metadatas): content = content or "" tokens = self._tokenize(content) total_len += len(tokens) doc_len[doc_id] = len(tokens) # compute term frequencies tf: Dict[str, int] = {} for tok in tokens: tf[tok] = tf.get(tok, 0) + 1 # update postings and df for tok, freq in tf.items(): if tok not in postings: postings[tok] = {doc_id: freq} df[tok] = 1 else: postings[tok][doc_id] = freq df[tok] = df.get(tok, 0) + 1 # store meta for reconstruction doc_meta[doc_id] = { "content": content, "metadata": meta or {}, } self._N = N self._avgdl = (total_len / N) if N else 0.0 self._df = df self._postings = postings self._doc_len = doc_len self._doc_meta = doc_meta self._index_ready = True self._index_built_at = time.time() # print(f"🔎 Keyword index built for {N} docs (avgdl={self._avgdl:.1f})") except Exception as e: print(f"âš ī¸ Failed building keyword index: {e}") self._index_ready = False def _bm25_scores(self, query: str) -> Dict[str, float]: self._ensure_index() if not self._index_ready or self._N == 0: return {} q_tokens = self._tokenize(query) if not q_tokens: return {} # collect candidate docs (union of postings for query tokens) candidate_docs: Dict[str, float] = {} k1, b = 1.5, 0.75 for tok in q_tokens: df = self._df.get(tok, 0) postings = self._postings.get(tok) if not postings or df == 0: continue # IDF with +1 stabilizer idf = math.log((self._N - df + 0.5) / (df + 0.5) + 1.0) for doc_id, tf in postings.items(): dl = self._doc_len.get(doc_id, 0) or 1 denom = tf + k1 * (1 - b + b * (dl / (self._avgdl or 1.0))) score = idf * (tf * (k1 + 1)) / denom candidate_docs[doc_id] = candidate_docs.get(doc_id, 0.0) + score return candidate_docs def _semantic_candidates(self, query: str, n: int) -> List[Tuple[Any, float]]: """Return list of (doc, score) for semantic candidates; fallback if scores not available.""" try: if hasattr(self.vector_store, "similarity_search_with_score"): docs_scores = self.vector_store.similarity_search_with_score(query, k=n) # docs_scores -> List[Tuple[Document, float]] where lower score is closer for some stores; normalize later return docs_scores # fallback: without scores, get docs and synthesize decreasing scores docs = self.vector_store.similarity_search(query, k=n) return list(zip(docs, [1.0 - (i / max(1, n)) for i in range(len(docs))])) except Exception as e: print(f"❌ Vector search failed: {e}") return [] def _normalize_scores(self, scores: Dict[str, float]) -> Dict[str, float]: if not scores: return {} vals = list(scores.values()) mx = max(vals) mn = min(vals) if mx == mn: return {k: 1.0 for k in scores} return {k: (v - mn) / (mx - mn) for k, v in scores.items()} def semantic_search(self, query: str, k: int = 5) -> List[Dict[str, Any]]: """Hybrid retrieval: fuse semantic and keyword (BM25-like) signals and return top-k results. Maintains original signature and return shape for compatibility. """ # Guardrails k = max(1, min(int(k or 5), 10)) # Collect candidates n_sem = max(k * 2, 10) n_kw = max(k * 3, 20) sem_pairs = self._semantic_candidates(query, n_sem) # Build semantic score map keyed by (url or id) sem_scores: Dict[str, float] = {} sem_docs_map: Dict[str, Any] = {} for d, score in sem_pairs: meta = getattr(d, "metadata", {}) or {} url = (meta.get("url") or "").strip() key = url or getattr(d, "id", None) or id(d) sem_scores[key] = float(score if score is not None else 0.0) sem_docs_map[key] = d # Normalize semantic scores to ascending relevance (higher better) # For some stores, lower distance is better; invert appropriately if sem_scores: # Try to detect if lower is better (distance) and invert vals = list(sem_scores.values()) lower_is_better = True if len(vals) > 1 and vals[0] > vals[-1] else False if lower_is_better: maxv = max(vals) sem_scores = {k: (maxv - v) for k, v in sem_scores.items()} sem_scores = self._normalize_scores(sem_scores) # Keyword BM25 candidates kw_raw_scores = self._bm25_scores(query) # Keep top n_kw keyword docs if kw_raw_scores: kw_items = sorted(kw_raw_scores.items(), key=lambda x: x[1], reverse=True)[:n_kw] kw_raw_scores = dict(kw_items) kw_scores = self._normalize_scores(kw_raw_scores) # Fusion: weighted sum alpha = 0.6 # semantic weight beta = 0.4 # keyword weight fused: Dict[str, float] = {} # Include all keys from both sets keys = set(sem_scores.keys()) | set(kw_scores.keys()) for key in keys: s = sem_scores.get(key, 0.0) w = kw_scores.get(key, 0.0) fused[key] = alpha * s + beta * w if not fused and sem_docs_map: # If keyword index not ready, fallback to semantic docs order ordered = sorted(sem_docs_map.items(), key=lambda kv: sem_scores.get(kv[0], 0.0), reverse=True) docs = [d for _, d in ordered[:k]] elif not fused and kw_scores: # If semantic failed, reconstruct docs from index metadata ordered = sorted(kw_scores.items(), key=lambda kv: kv[1], reverse=True)[:k] docs = [] for doc_id, _ in ordered: meta_entry = self._doc_meta.get(doc_id) or {} content = meta_entry.get("content", "") meta = meta_entry.get("metadata", {}) docs.append(Document(page_content=content, metadata=meta)) else: ordered = sorted(fused.items(), key=lambda kv: kv[1], reverse=True)[:max(k*2, 20)] docs = [] seen_keys = set() for key, _ in ordered: if key in seen_keys: continue seen_keys.add(key) if key in sem_docs_map: docs.append(sem_docs_map[key]) else: # reconstruct from keyword index meta_entry = self._doc_meta.get(key) or {} content = meta_entry.get("content", "") meta = meta_entry.get("metadata", {}) docs.append(Document(page_content=content, metadata=meta)) # Convert to results shape and dedupe by URL results: List[Dict[str, Any]] = [] seen_urls = set() for d in docs: meta = getattr(d, "metadata", {}) or {} content = getattr(d, "page_content", "") or "" title = "Unknown" if content.startswith("Title: "): line = content.splitlines()[0] title = line.replace("Title: ", "").strip() or title url = (meta.get("url", "") or "").strip() if url and url in seen_urls: continue seen_urls.add(url) results.append({ "title": title, "content": content, "url": url, "source": meta.get("source", "Unknown"), "metadata": meta, }) if len(results) >= k: break return results