Spaces:
Sleeping
Sleeping
| """ | |
| Mylangv2: Process raw text or uploaded document into vectorstore and generate questions via Azure OpenAI using LangChain. | |
| Includes a simple CLI test at the bottom to verify both `process_text` and `process_uploaded_document`. | |
| """ | |
| import os | |
| import logging | |
| import re | |
| import json | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| from typing import Dict, List, Any, Tuple, Optional | |
| # LangChain and Azure OpenAI imports | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI | |
| from langchain.document_loaders import PyPDFLoader | |
| from langchain.chains import LLMChain | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain.evaluation import load_evaluator | |
| # Vectorstore: FAISS | |
| try: | |
| from langchain_community.vectorstores import FAISS | |
| except ImportError as e: | |
| FAISS = None | |
| logging.warning( | |
| "FAISS import failed (%s). Falling back to in-memory store. " | |
| "Install faiss-cpu/faiss-gpu or downgrade NumPy to <2.0 to enable FAISS." % e | |
| ) | |
| # Load env vars | |
| dotenv_path = os.getenv('DOTENV_PATH') | |
| if dotenv_path: | |
| load_dotenv(dotenv_path) | |
| else: | |
| load_dotenv() | |
| # Validate env vars | |
| def check_env(): | |
| required = [ | |
| "AZURE_OPENAI_API_KEY", | |
| "AZURE_OPENAI_ENDPOINT", | |
| "AZURE_OPENAI_EMBEDDING_DEPLOYMENT", | |
| "AZURE_OPENAI_CHAT_DEPLOYMENT", | |
| "AZURE_OPENAI_API_VERSION" | |
| ] | |
| missing = [v for v in required if not os.getenv(v)] | |
| if missing: | |
| raise EnvironmentError(f"Missing required environment variables: {', '.join(missing)}") | |
| class DocumentProcessor: | |
| def __init__( | |
| self, | |
| embeddings: Optional[AzureOpenAIEmbeddings] = None, | |
| text_splitter: Optional[RecursiveCharacterTextSplitter] = None | |
| ): | |
| """Initialize DocumentProcessor with injectable embeddings and splitter.""" | |
| self.embeddings = embeddings or AzureOpenAIEmbeddings( | |
| azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
| api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
| api_version=os.getenv("AZURE_OPENAI_API_VERSION"), | |
| model=os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") | |
| ) | |
| self.text_splitter = text_splitter or RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| def _create_fallback_vectorstore(self, texts: List[str]) -> Any: | |
| """Creates a basic in-memory vectorstore with cosine similarity search and embedding shape checks.""" | |
| embs = self.embeddings.embed_documents(texts) | |
| dim = len(embs[0]) if embs else 0 | |
| class Doc: | |
| def __init__(self, content: str): | |
| self.page_content = content | |
| class BasicVectorStore: | |
| def __init__(self, texts: List[str], embs: List[List[float]]): | |
| self.texts = texts | |
| self.embs = embs | |
| def similarity_search(self, query: str, k: int = 3) -> List[Doc]: | |
| q_emb = self.embeddings.embed_query(query) | |
| if len(q_emb) != dim: | |
| raise ValueError(f"Query embedding dimension {len(q_emb)} != stored dimension {dim}") | |
| sims = [] | |
| for emb in self.embs: | |
| if len(emb) != dim: | |
| raise ValueError("Stored embedding has unexpected dimension") | |
| sims.append(np.dot(q_emb, emb) / (np.linalg.norm(q_emb) * np.linalg.norm(emb))) | |
| idxs = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:k] | |
| return [Doc(self.texts[i]) for i in idxs] | |
| # Bind embeddings for inner class | |
| BasicVectorStore.embeddings = self.embeddings | |
| return BasicVectorStore(texts, embs) | |
| def process_text(self, text: str, persist_directory: str = None) -> Tuple[Any, List[str], Dict[str, str]]: | |
| """Split raw text, build vectorstore (FAISS or fallback), return store, chunks, and metadata.""" | |
| chunks = self.text_splitter.split_text(text) | |
| backend = 'fallback' | |
| if FAISS: | |
| try: | |
| vs = FAISS.from_texts(texts=chunks, embedding=self.embeddings) | |
| backend = 'faiss' | |
| if persist_directory: | |
| vs.save_local(persist_directory) | |
| _log_vectorstore_size(persist_directory) | |
| logging.info(f"Processed {len(chunks)} chunks into FAISS vectorstore.") | |
| return vs, chunks, {'backend': backend} | |
| except Exception as e: | |
| logging.warning(f"FAISS.from_texts failed ({e}), using fallback vectorstore.") | |
| vs_fb = self._create_fallback_vectorstore(chunks) | |
| logging.info(f"Processed {len(chunks)} chunks into fallback vectorstore.") | |
| return vs_fb, chunks, {'backend': backend} | |
| def process_uploaded_document( | |
| self, pdf_path: str, persist_directory: str = None | |
| ) -> Tuple[Any, List[str], Dict[str, str]]: | |
| """Load PDF, split, build vectorstore, and return store, raw texts, and metadata.""" | |
| loader = PyPDFLoader(pdf_path) | |
| pages = loader.load() | |
| docs = self.text_splitter.split_documents(pages) | |
| texts = [doc.page_content for doc in docs] | |
| backend = 'fallback' | |
| if FAISS: | |
| try: | |
| vs = FAISS.from_documents(documents=docs, embedding=self.embeddings) | |
| backend = 'faiss' | |
| if persist_directory: | |
| vs.save_local(persist_directory) | |
| _log_vectorstore_size(persist_directory) | |
| logging.info(f"Processed PDF with {len(texts)} chunks into FAISS vectorstore.") | |
| return vs, texts, {'backend': backend} | |
| except Exception as e: | |
| logging.warning(f"FAISS.from_documents failed ({e}), falling back.") | |
| vs_fb = self._create_fallback_vectorstore(texts) | |
| logging.info(f"Processed PDF with {len(texts)} chunks into fallback vectorstore.") | |
| return vs_fb, texts, {'backend': backend} | |
| class QuestionGenerator: | |
| def __init__(self, prompt_template_path: str = None): | |
| # Load prompt template from file or default | |
| if prompt_template_path and os.path.exists(prompt_template_path): | |
| with open(prompt_template_path) as f: | |
| template_str = f.read() | |
| else: | |
| template_str = ( | |
| """ | |
| Based on the following context: | |
| {context} | |
| Generate {num_questions} {question_type} questions for: | |
| Subject: {subject} | |
| Class: {class_grade} | |
| Topic: {topic} | |
| Difficulty: {difficulty} | |
| Bloom's Level: {bloom_level} | |
| Additional Instructions: {instructions} | |
| Format as JSON: | |
| {"questions": [{"question":"","options":[],"correctAnswer":"","explanation":""}]} | |
| """ | |
| ) | |
| self.llm = AzureChatOpenAI( | |
| azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
| api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
| api_version=os.getenv("AZURE_OPENAI_API_VERSION"), | |
| model=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT"), | |
| temperature=0.3 | |
| ) | |
| self.chain = LLMChain( | |
| llm=self.llm, | |
| prompt=PromptTemplate( | |
| input_variables=[ | |
| "context","num_questions","question_type","subject", | |
| "class_grade","topic","difficulty","bloom_level","instructions" | |
| ], | |
| template=template_str | |
| ) | |
| ) | |
| def generate_questions(self, topic_data: Dict[str, Any], vectorstore: Any) -> Dict[str, Any]: | |
| # Validate topic_data keys | |
| required_keys = [ | |
| 'subjectName','sectionName','numQuestions','questionType', | |
| 'classGrade','difficulty','bloomLevel' | |
| ] | |
| missing = [k for k in required_keys if k not in topic_data] | |
| if missing: | |
| raise ValueError(f"Missing required topic_data keys: {', '.join(missing)}") | |
| context = "" | |
| if vectorstore: | |
| docs = vectorstore.similarity_search( | |
| f"{topic_data['subjectName']} {topic_data['sectionName']}", k=3 | |
| ) | |
| context = "\n".join(getattr(doc, 'page_content', '') for doc in docs) | |
| logging.info(f"Context length: {len(context)}") | |
| payload = { | |
| "context": context, | |
| "num_questions": topic_data['numQuestions'], | |
| "question_type": topic_data['questionType'], | |
| "subject": topic_data['subjectName'], | |
| "class_grade": topic_data['classGrade'], | |
| "topic": topic_data['sectionName'], | |
| "difficulty": topic_data['difficulty'], | |
| "bloom_level": topic_data['bloomLevel'], | |
| "instructions": topic_data.get('additionalInstructions','') | |
| } | |
| response = self.chain.invoke(payload) | |
| text = response.get('text', response) if isinstance(response, dict) else response | |
| output = text.strip() | |
| if output.startswith('```') and output.endswith('```'): | |
| output = re.sub(r'^```[a-zA-Z]*|```$', '', output).strip() | |
| try: | |
| result = json.loads(output) | |
| except json.JSONDecodeError: | |
| logging.error(f"JSON parsing failed. Raw output: {output}") | |
| raise | |
| if 'questions' not in result: | |
| raise ValueError(f"Missing 'questions' key in output JSON: {result}") | |
| return result | |
| class QuestionEvaluator: | |
| def __init__(self): | |
| common = dict( | |
| azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
| api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
| api_version=os.getenv("AZURE_OPENAI_API_VERSION"), | |
| model=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") | |
| ) | |
| self.evaluator = load_evaluator( | |
| "qa", | |
| llm=AzureChatOpenAI(**common, temperature=0) | |
| ) | |
| def evaluate(self, question: str, answer: str, reference: str) -> Dict[str, Any]: | |
| """Evaluate question-answer pair against reference.""" | |
| try: | |
| return self.evaluator.evaluate_strings( | |
| input=question, | |
| prediction=answer, | |
| reference=reference | |
| ) | |
| except Exception as e: | |
| logging.error(f"Evaluation error: {e}") | |
| raise | |
| # Helper for logging vectorstore size | |
| def _log_vectorstore_size(directory: str): | |
| total = 0 | |
| for root, _, files in os.walk(directory): | |
| for f in files: | |
| total += os.path.getsize(os.path.join(root, f)) | |
| logging.info(f"Vectorstore on disk: {total/1024:.2f} KB") | |
| # CLI test and env validation | |
| def main(): | |
| # Validate env only on script run | |
| check_env() | |
| dp = DocumentProcessor() | |
| sample = "This is a simple test. It splits into chunks and embeds." | |
| vs, chunks, meta = dp.process_text(sample) | |
| print("Chunks:", chunks) | |
| print("Backend used:", meta['backend']) | |
| if os.path.exists('sample.pdf'): | |
| vs2, raw, meta2 = dp.process_uploaded_document('sample.pdf') | |
| print("PDF raw chunks count:", len(raw), "Backend:", meta2['backend']) | |
| if __name__ == "__main__": | |
| main() |