Prashnotri / mylangv2.py
bibeksah's picture
changing field frm pdf to raw data
646deb1
"""
Mylangv2: Process raw text or uploaded document into vectorstore and generate questions via Azure OpenAI using LangChain.
Includes a simple CLI test at the bottom to verify both `process_text` and `process_uploaded_document`.
"""
import os
import logging
import re
import json
import numpy as np
from dotenv import load_dotenv
from typing import Dict, List, Any, Tuple, Optional
# LangChain and Azure OpenAI imports
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
from langchain.document_loaders import PyPDFLoader
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain.evaluation import load_evaluator
# Vectorstore: FAISS
try:
from langchain_community.vectorstores import FAISS
except ImportError as e:
FAISS = None
logging.warning(
"FAISS import failed (%s). Falling back to in-memory store. "
"Install faiss-cpu/faiss-gpu or downgrade NumPy to <2.0 to enable FAISS." % e
)
# Load env vars
dotenv_path = os.getenv('DOTENV_PATH')
if dotenv_path:
load_dotenv(dotenv_path)
else:
load_dotenv()
# Validate env vars
def check_env():
required = [
"AZURE_OPENAI_API_KEY",
"AZURE_OPENAI_ENDPOINT",
"AZURE_OPENAI_EMBEDDING_DEPLOYMENT",
"AZURE_OPENAI_CHAT_DEPLOYMENT",
"AZURE_OPENAI_API_VERSION"
]
missing = [v for v in required if not os.getenv(v)]
if missing:
raise EnvironmentError(f"Missing required environment variables: {', '.join(missing)}")
class DocumentProcessor:
def __init__(
self,
embeddings: Optional[AzureOpenAIEmbeddings] = None,
text_splitter: Optional[RecursiveCharacterTextSplitter] = None
):
"""Initialize DocumentProcessor with injectable embeddings and splitter."""
self.embeddings = embeddings or AzureOpenAIEmbeddings(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
model=os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT")
)
self.text_splitter = text_splitter or RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", " ", ""]
)
def _create_fallback_vectorstore(self, texts: List[str]) -> Any:
"""Creates a basic in-memory vectorstore with cosine similarity search and embedding shape checks."""
embs = self.embeddings.embed_documents(texts)
dim = len(embs[0]) if embs else 0
class Doc:
def __init__(self, content: str):
self.page_content = content
class BasicVectorStore:
def __init__(self, texts: List[str], embs: List[List[float]]):
self.texts = texts
self.embs = embs
def similarity_search(self, query: str, k: int = 3) -> List[Doc]:
q_emb = self.embeddings.embed_query(query)
if len(q_emb) != dim:
raise ValueError(f"Query embedding dimension {len(q_emb)} != stored dimension {dim}")
sims = []
for emb in self.embs:
if len(emb) != dim:
raise ValueError("Stored embedding has unexpected dimension")
sims.append(np.dot(q_emb, emb) / (np.linalg.norm(q_emb) * np.linalg.norm(emb)))
idxs = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:k]
return [Doc(self.texts[i]) for i in idxs]
# Bind embeddings for inner class
BasicVectorStore.embeddings = self.embeddings
return BasicVectorStore(texts, embs)
def process_text(self, text: str, persist_directory: str = None) -> Tuple[Any, List[str], Dict[str, str]]:
"""Split raw text, build vectorstore (FAISS or fallback), return store, chunks, and metadata."""
chunks = self.text_splitter.split_text(text)
backend = 'fallback'
if FAISS:
try:
vs = FAISS.from_texts(texts=chunks, embedding=self.embeddings)
backend = 'faiss'
if persist_directory:
vs.save_local(persist_directory)
_log_vectorstore_size(persist_directory)
logging.info(f"Processed {len(chunks)} chunks into FAISS vectorstore.")
return vs, chunks, {'backend': backend}
except Exception as e:
logging.warning(f"FAISS.from_texts failed ({e}), using fallback vectorstore.")
vs_fb = self._create_fallback_vectorstore(chunks)
logging.info(f"Processed {len(chunks)} chunks into fallback vectorstore.")
return vs_fb, chunks, {'backend': backend}
def process_uploaded_document(
self, pdf_path: str, persist_directory: str = None
) -> Tuple[Any, List[str], Dict[str, str]]:
"""Load PDF, split, build vectorstore, and return store, raw texts, and metadata."""
loader = PyPDFLoader(pdf_path)
pages = loader.load()
docs = self.text_splitter.split_documents(pages)
texts = [doc.page_content for doc in docs]
backend = 'fallback'
if FAISS:
try:
vs = FAISS.from_documents(documents=docs, embedding=self.embeddings)
backend = 'faiss'
if persist_directory:
vs.save_local(persist_directory)
_log_vectorstore_size(persist_directory)
logging.info(f"Processed PDF with {len(texts)} chunks into FAISS vectorstore.")
return vs, texts, {'backend': backend}
except Exception as e:
logging.warning(f"FAISS.from_documents failed ({e}), falling back.")
vs_fb = self._create_fallback_vectorstore(texts)
logging.info(f"Processed PDF with {len(texts)} chunks into fallback vectorstore.")
return vs_fb, texts, {'backend': backend}
class QuestionGenerator:
def __init__(self, prompt_template_path: str = None):
# Load prompt template from file or default
if prompt_template_path and os.path.exists(prompt_template_path):
with open(prompt_template_path) as f:
template_str = f.read()
else:
template_str = (
"""
Based on the following context:
{context}
Generate {num_questions} {question_type} questions for:
Subject: {subject}
Class: {class_grade}
Topic: {topic}
Difficulty: {difficulty}
Bloom's Level: {bloom_level}
Additional Instructions: {instructions}
Format as JSON:
{"questions": [{"question":"","options":[],"correctAnswer":"","explanation":""}]}
"""
)
self.llm = AzureChatOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
model=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT"),
temperature=0.3
)
self.chain = LLMChain(
llm=self.llm,
prompt=PromptTemplate(
input_variables=[
"context","num_questions","question_type","subject",
"class_grade","topic","difficulty","bloom_level","instructions"
],
template=template_str
)
)
def generate_questions(self, topic_data: Dict[str, Any], vectorstore: Any) -> Dict[str, Any]:
# Validate topic_data keys
required_keys = [
'subjectName','sectionName','numQuestions','questionType',
'classGrade','difficulty','bloomLevel'
]
missing = [k for k in required_keys if k not in topic_data]
if missing:
raise ValueError(f"Missing required topic_data keys: {', '.join(missing)}")
context = ""
if vectorstore:
docs = vectorstore.similarity_search(
f"{topic_data['subjectName']} {topic_data['sectionName']}", k=3
)
context = "\n".join(getattr(doc, 'page_content', '') for doc in docs)
logging.info(f"Context length: {len(context)}")
payload = {
"context": context,
"num_questions": topic_data['numQuestions'],
"question_type": topic_data['questionType'],
"subject": topic_data['subjectName'],
"class_grade": topic_data['classGrade'],
"topic": topic_data['sectionName'],
"difficulty": topic_data['difficulty'],
"bloom_level": topic_data['bloomLevel'],
"instructions": topic_data.get('additionalInstructions','')
}
response = self.chain.invoke(payload)
text = response.get('text', response) if isinstance(response, dict) else response
output = text.strip()
if output.startswith('```') and output.endswith('```'):
output = re.sub(r'^```[a-zA-Z]*|```$', '', output).strip()
try:
result = json.loads(output)
except json.JSONDecodeError:
logging.error(f"JSON parsing failed. Raw output: {output}")
raise
if 'questions' not in result:
raise ValueError(f"Missing 'questions' key in output JSON: {result}")
return result
class QuestionEvaluator:
def __init__(self):
common = dict(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
model=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
)
self.evaluator = load_evaluator(
"qa",
llm=AzureChatOpenAI(**common, temperature=0)
)
def evaluate(self, question: str, answer: str, reference: str) -> Dict[str, Any]:
"""Evaluate question-answer pair against reference."""
try:
return self.evaluator.evaluate_strings(
input=question,
prediction=answer,
reference=reference
)
except Exception as e:
logging.error(f"Evaluation error: {e}")
raise
# Helper for logging vectorstore size
def _log_vectorstore_size(directory: str):
total = 0
for root, _, files in os.walk(directory):
for f in files:
total += os.path.getsize(os.path.join(root, f))
logging.info(f"Vectorstore on disk: {total/1024:.2f} KB")
# CLI test and env validation
def main():
# Validate env only on script run
check_env()
dp = DocumentProcessor()
sample = "This is a simple test. It splits into chunks and embeds."
vs, chunks, meta = dp.process_text(sample)
print("Chunks:", chunks)
print("Backend used:", meta['backend'])
if os.path.exists('sample.pdf'):
vs2, raw, meta2 = dp.process_uploaded_document('sample.pdf')
print("PDF raw chunks count:", len(raw), "Backend:", meta2['backend'])
if __name__ == "__main__":
main()