""" Mylangv2: Process raw text or uploaded document into vectorstore and generate questions via Azure OpenAI using LangChain. Includes a simple CLI test at the bottom to verify both `process_text` and `process_uploaded_document`. """ import os import logging import re import json import numpy as np from dotenv import load_dotenv from typing import Dict, List, Any, Tuple, Optional # LangChain and Azure OpenAI imports from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI from langchain.document_loaders import PyPDFLoader from langchain.chains import LLMChain from langchain_core.prompts import PromptTemplate from langchain.evaluation import load_evaluator # Vectorstore: FAISS try: from langchain_community.vectorstores import FAISS except ImportError as e: FAISS = None logging.warning( "FAISS import failed (%s). Falling back to in-memory store. " "Install faiss-cpu/faiss-gpu or downgrade NumPy to <2.0 to enable FAISS." % e ) # Load env vars dotenv_path = os.getenv('DOTENV_PATH') if dotenv_path: load_dotenv(dotenv_path) else: load_dotenv() # Validate env vars def check_env(): required = [ "AZURE_OPENAI_API_KEY", "AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "AZURE_OPENAI_CHAT_DEPLOYMENT", "AZURE_OPENAI_API_VERSION" ] missing = [v for v in required if not os.getenv(v)] if missing: raise EnvironmentError(f"Missing required environment variables: {', '.join(missing)}") class DocumentProcessor: def __init__( self, embeddings: Optional[AzureOpenAIEmbeddings] = None, text_splitter: Optional[RecursiveCharacterTextSplitter] = None ): """Initialize DocumentProcessor with injectable embeddings and splitter.""" self.embeddings = embeddings or AzureOpenAIEmbeddings( azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_OPENAI_API_VERSION"), model=os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") ) self.text_splitter = text_splitter or RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, separators=["\n\n", "\n", " ", ""] ) def _create_fallback_vectorstore(self, texts: List[str]) -> Any: """Creates a basic in-memory vectorstore with cosine similarity search and embedding shape checks.""" embs = self.embeddings.embed_documents(texts) dim = len(embs[0]) if embs else 0 class Doc: def __init__(self, content: str): self.page_content = content class BasicVectorStore: def __init__(self, texts: List[str], embs: List[List[float]]): self.texts = texts self.embs = embs def similarity_search(self, query: str, k: int = 3) -> List[Doc]: q_emb = self.embeddings.embed_query(query) if len(q_emb) != dim: raise ValueError(f"Query embedding dimension {len(q_emb)} != stored dimension {dim}") sims = [] for emb in self.embs: if len(emb) != dim: raise ValueError("Stored embedding has unexpected dimension") sims.append(np.dot(q_emb, emb) / (np.linalg.norm(q_emb) * np.linalg.norm(emb))) idxs = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:k] return [Doc(self.texts[i]) for i in idxs] # Bind embeddings for inner class BasicVectorStore.embeddings = self.embeddings return BasicVectorStore(texts, embs) def process_text(self, text: str, persist_directory: str = None) -> Tuple[Any, List[str], Dict[str, str]]: """Split raw text, build vectorstore (FAISS or fallback), return store, chunks, and metadata.""" chunks = self.text_splitter.split_text(text) backend = 'fallback' if FAISS: try: vs = FAISS.from_texts(texts=chunks, embedding=self.embeddings) backend = 'faiss' if persist_directory: vs.save_local(persist_directory) _log_vectorstore_size(persist_directory) logging.info(f"Processed {len(chunks)} chunks into FAISS vectorstore.") return vs, chunks, {'backend': backend} except Exception as e: logging.warning(f"FAISS.from_texts failed ({e}), using fallback vectorstore.") vs_fb = self._create_fallback_vectorstore(chunks) logging.info(f"Processed {len(chunks)} chunks into fallback vectorstore.") return vs_fb, chunks, {'backend': backend} def process_uploaded_document( self, pdf_path: str, persist_directory: str = None ) -> Tuple[Any, List[str], Dict[str, str]]: """Load PDF, split, build vectorstore, and return store, raw texts, and metadata.""" loader = PyPDFLoader(pdf_path) pages = loader.load() docs = self.text_splitter.split_documents(pages) texts = [doc.page_content for doc in docs] backend = 'fallback' if FAISS: try: vs = FAISS.from_documents(documents=docs, embedding=self.embeddings) backend = 'faiss' if persist_directory: vs.save_local(persist_directory) _log_vectorstore_size(persist_directory) logging.info(f"Processed PDF with {len(texts)} chunks into FAISS vectorstore.") return vs, texts, {'backend': backend} except Exception as e: logging.warning(f"FAISS.from_documents failed ({e}), falling back.") vs_fb = self._create_fallback_vectorstore(texts) logging.info(f"Processed PDF with {len(texts)} chunks into fallback vectorstore.") return vs_fb, texts, {'backend': backend} class QuestionGenerator: def __init__(self, prompt_template_path: str = None): # Load prompt template from file or default if prompt_template_path and os.path.exists(prompt_template_path): with open(prompt_template_path) as f: template_str = f.read() else: template_str = ( """ Based on the following context: {context} Generate {num_questions} {question_type} questions for: Subject: {subject} Class: {class_grade} Topic: {topic} Difficulty: {difficulty} Bloom's Level: {bloom_level} Additional Instructions: {instructions} Format as JSON: {"questions": [{"question":"","options":[],"correctAnswer":"","explanation":""}]} """ ) self.llm = AzureChatOpenAI( azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_OPENAI_API_VERSION"), model=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT"), temperature=0.3 ) self.chain = LLMChain( llm=self.llm, prompt=PromptTemplate( input_variables=[ "context","num_questions","question_type","subject", "class_grade","topic","difficulty","bloom_level","instructions" ], template=template_str ) ) def generate_questions(self, topic_data: Dict[str, Any], vectorstore: Any) -> Dict[str, Any]: # Validate topic_data keys required_keys = [ 'subjectName','sectionName','numQuestions','questionType', 'classGrade','difficulty','bloomLevel' ] missing = [k for k in required_keys if k not in topic_data] if missing: raise ValueError(f"Missing required topic_data keys: {', '.join(missing)}") context = "" if vectorstore: docs = vectorstore.similarity_search( f"{topic_data['subjectName']} {topic_data['sectionName']}", k=3 ) context = "\n".join(getattr(doc, 'page_content', '') for doc in docs) logging.info(f"Context length: {len(context)}") payload = { "context": context, "num_questions": topic_data['numQuestions'], "question_type": topic_data['questionType'], "subject": topic_data['subjectName'], "class_grade": topic_data['classGrade'], "topic": topic_data['sectionName'], "difficulty": topic_data['difficulty'], "bloom_level": topic_data['bloomLevel'], "instructions": topic_data.get('additionalInstructions','') } response = self.chain.invoke(payload) text = response.get('text', response) if isinstance(response, dict) else response output = text.strip() if output.startswith('```') and output.endswith('```'): output = re.sub(r'^```[a-zA-Z]*|```$', '', output).strip() try: result = json.loads(output) except json.JSONDecodeError: logging.error(f"JSON parsing failed. Raw output: {output}") raise if 'questions' not in result: raise ValueError(f"Missing 'questions' key in output JSON: {result}") return result class QuestionEvaluator: def __init__(self): common = dict( azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_OPENAI_API_VERSION"), model=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") ) self.evaluator = load_evaluator( "qa", llm=AzureChatOpenAI(**common, temperature=0) ) def evaluate(self, question: str, answer: str, reference: str) -> Dict[str, Any]: """Evaluate question-answer pair against reference.""" try: return self.evaluator.evaluate_strings( input=question, prediction=answer, reference=reference ) except Exception as e: logging.error(f"Evaluation error: {e}") raise # Helper for logging vectorstore size def _log_vectorstore_size(directory: str): total = 0 for root, _, files in os.walk(directory): for f in files: total += os.path.getsize(os.path.join(root, f)) logging.info(f"Vectorstore on disk: {total/1024:.2f} KB") # CLI test and env validation def main(): # Validate env only on script run check_env() dp = DocumentProcessor() sample = "This is a simple test. It splits into chunks and embeds." vs, chunks, meta = dp.process_text(sample) print("Chunks:", chunks) print("Backend used:", meta['backend']) if os.path.exists('sample.pdf'): vs2, raw, meta2 = dp.process_uploaded_document('sample.pdf') print("PDF raw chunks count:", len(raw), "Backend:", meta2['backend']) if __name__ == "__main__": main()