Spaces:
Runtime error
Runtime error
| from haystack.nodes import TransformersQueryClassifier, Docs2Answers | |
| from haystack.nodes import EmbeddingRetriever, FARMReader | |
| from haystack.nodes.base import BaseComponent | |
| from haystack.document_stores import InMemoryDocumentStore | |
| from markdown import markdown | |
| from annotated_text import annotation | |
| from haystack.schema import Document | |
| from typing import List, Text, Union | |
| from typing_extensions import Literal | |
| from utils.preprocessing import processingpipeline | |
| from utils.streamlitcheck import check_streamlit | |
| from haystack.pipelines import Pipeline | |
| import pandas as pd | |
| import logging | |
| try: | |
| from termcolor import colored | |
| except: | |
| pass | |
| try: | |
| import streamlit as st | |
| except ImportError: | |
| logging.info("Streamlit not installed") | |
| def loadQueryClassifier(): | |
| """ | |
| retuns the haystack query classifier model | |
| model = shahrukhx01/bert-mini-finetune-question-detection | |
| """ | |
| query_classifier = TransformersQueryClassifier(model_name_or_path= | |
| "shahrukhx01/bert-mini-finetune-question-detection") | |
| return query_classifier | |
| class QueryCheck(BaseComponent): | |
| """ | |
| Uses Query Classifier from Haystack, process the query based on query type. | |
| Ability to determine the statements is not so good, therefore the chances | |
| statement also get modified. Ex: "List water related issues" will be | |
| identified by the model as keywords, and therefore it be processed as "what | |
| are the 'list all water related issues' related issues and discussions?". | |
| This is one shortcoming but is igonred for now, as semantic search will not | |
| get affected a lot, by this. If you want to pass keywords list and want to | |
| do batch processing use. run_batch. Example: if you want to find relevant | |
| passages for water, food security, poverty then querylist = ["water", "food | |
| security","poverty"] and then execute QueryCheck.run_batch(queries = querylist) | |
| 1. https://docs.haystack.deepset.ai/docs/query_classifier | |
| """ | |
| outgoing_edges = 1 | |
| def run(self, query:str): | |
| """ | |
| mandatory method to use the custom node. Determines the query type, if | |
| if the query is of type keyword/statement will modify it to make it more | |
| useful for sentence transoformers. | |
| Params | |
| -------- | |
| query: query/statement/keywords in form of string | |
| Return | |
| ------ | |
| output: dictionary, with key as identifier and value could be anything | |
| we need to return. In this case the output contain key = 'query'. | |
| output_1: As there is only one outgoing edge, we pass 'output_1' string | |
| """ | |
| query_classifier = loadQueryClassifier() | |
| result = query_classifier.run(query=query) | |
| if result[1] == "output_1": | |
| output = {"query":query, | |
| "query_type": 'question/statement'} | |
| else: | |
| output = {"query": "what are the {} related issues and \ | |
| discussions?".format(query), | |
| "query_type": 'statements/keyword'} | |
| logging.info(output) | |
| return output, "output_1" | |
| def run_batch(self, queries:List[str]): | |
| """ | |
| running multiple queries in one go, howeevr need the queries to be passed | |
| as list of string. Example: if you want to find relevant passages for | |
| water, food security, poverty then querylist = ["water", "food security", | |
| "poverty"] and then execute QueryCheck.run_batch(queries = querylist) | |
| Params | |
| -------- | |
| queries: queries/statements/keywords in form of string encapsulated | |
| within List | |
| Return | |
| ------ | |
| output: dictionary, with key as identifier and value could be anything | |
| we need to return. In this case the output contain key = 'queries'. | |
| output_1: As there is only one outgoing edge, we pass 'output_1' string | |
| """ | |
| query_classifier = loadQueryClassifier() | |
| query_list = [] | |
| for query in queries: | |
| result = query_classifier.run(query=query) | |
| if result[1] == "output_1": | |
| query_list.append(query) | |
| else: | |
| query_list.append("what are the {} related issues and \ | |
| discussions?".format(query)) | |
| output = {'queries':query_list} | |
| logging.info(output) | |
| return output, "output_1" | |
| def runSemanticPreprocessingPipeline(file_path:str, file_name:str, | |
| split_by: Literal["sentence", "word"] = 'sentence', | |
| split_length:int = 2, split_overlap:int = 0, | |
| split_respect_sentence_boundary:bool = False, | |
| remove_punc:bool = False)->List[Document]: | |
| """ | |
| creates the pipeline and runs the preprocessing pipeline. | |
| Params | |
| ------------ | |
| file_name: filename, in case of streamlit application use | |
| st.session_state['filename'] | |
| file_path: filepath, in case of streamlit application use | |
| st.session_state['filepath'] | |
| split_by: document splitting strategy either as word or sentence | |
| split_length: when synthetically creating the paragrpahs from document, | |
| it defines the length of paragraph. | |
| split_overlap: Number of words or sentences that overlap when creating the | |
| paragraphs. This is done as one sentence or 'some words' make sense | |
| when read in together with others. Therefore the overlap is used. | |
| split_respect_sentence_boundary: Used when using 'word' strategy for | |
| splititng of text. | |
| remove_punc: to remove all Punctuation including ',' and '.' or not | |
| Return | |
| -------------- | |
| List[Document]: When preprocessing pipeline is run, the output dictionary | |
| has four objects. For the Haysatck implementation of semantic search we, | |
| need to use the List of Haystack Document, which can be fetched by | |
| key = 'documents' on output. | |
| """ | |
| semantic_processing_pipeline = processingpipeline() | |
| output_semantic_pre = semantic_processing_pipeline.run(file_paths = file_path, | |
| params= {"FileConverter": {"file_path": file_path, \ | |
| "file_name": file_name}, | |
| "UdfPreProcessor": {"remove_punc": remove_punc, \ | |
| "split_by": split_by, \ | |
| "split_length":split_length,\ | |
| "split_overlap": split_overlap, | |
| "split_respect_sentence_boundary":split_respect_sentence_boundary}}) | |
| return output_semantic_pre | |
| def loadRetriever(embedding_model:Text=None, embedding_model_format:Text = None, | |
| embedding_layer:int = None, retriever_top_k:int = 10, | |
| max_seq_len:int=512, document_store:InMemoryDocumentStore=None): | |
| """ | |
| Returns the Retriever model based on params provided. | |
| 1. https://docs.haystack.deepset.ai/docs/retriever#embedding-retrieval-recommended | |
| 2. https://www.sbert.net/examples/applications/semantic-search/README.html | |
| 3. https://github.com/deepset-ai/haystack/blob/main/haystack/nodes/retriever/dense.py | |
| Params | |
| --------- | |
| embedding_model: Name of the model to be used for embedding. Check the links | |
| provided in documentation | |
| embedding_model_format: check the github link of Haystack provided in | |
| documentation embedding_layer: check the github link of Haystack | |
| provided in documentation retriever_top_k: Number of Top results to | |
| be returned by | |
| retriever max_seq_len: everymodel has max seq len it can handle, check in | |
| model card. Needed to hanlde the edge cases. | |
| document_store: InMemoryDocumentStore, write haystack Document list to | |
| DocumentStore and pass the same to function call. Can be done using | |
| createDocumentStore from utils. | |
| Return | |
| ------- | |
| retriever: embedding model | |
| """ | |
| logging.info("loading retriever") | |
| if document_store is None: | |
| logging.warning("Retriever initialization requires the DocumentStore") | |
| return | |
| retriever = EmbeddingRetriever( | |
| embedding_model=embedding_model,top_k = retriever_top_k, | |
| document_store = document_store, | |
| emb_extraction_layer=embedding_layer, scale_score =True, | |
| model_format=embedding_model_format, use_gpu = True, | |
| max_seq_len = max_seq_len ) | |
| if check_streamlit: | |
| st.session_state['retriever'] = retriever | |
| return retriever | |
| def createDocumentStore(documents:List[Document], similarity:str = 'dot_product', | |
| embedding_dim:int = 768): | |
| """ | |
| Creates the InMemory Document Store from haystack list of Documents. | |
| It is mandatory component for Retriever to work in Haystack frame work. | |
| Params | |
| ------- | |
| documents: List of haystack document. If using the preprocessing pipeline, | |
| can be fetched key = 'documents; on output of preprocessing pipeline. | |
| similarity: scoring function, can be either 'cosine' or 'dot_product' | |
| embedding_dim: Document store has default value of embedding size = 768, and | |
| update_embeddings method of Docstore cannot infer the embedding size of | |
| retiever automatically, therefore set this value as per the model card. | |
| Return | |
| ------- | |
| document_store: InMemory Document Store object type. | |
| """ | |
| document_store = InMemoryDocumentStore(similarity = similarity, | |
| embedding_dim = embedding_dim ) | |
| document_store.write_documents(documents) | |
| return document_store | |
| def semanticSearchPipeline(documents:List[Document], embedding_model:Text = None, | |
| embedding_model_format:Text = None,embedding_layer:int = None, | |
| embedding_dim:int = 768,retriever_top_k:int = 10, | |
| reader_model:str = None, reader_top_k:int = 10, | |
| max_seq_len:int =512,useQueryCheck = True, | |
| top_k_per_candidate:int = 1): | |
| """ | |
| creates the semantic search pipeline and document Store object from the | |
| list of haystack documents. The top_k for the Reader and Retirever are kept | |
| same, so that all the results returned by Retriever are used, however the | |
| context is extracted by Reader for each retrieved result. The querycheck is | |
| added as node to process the query. This pipeline is suited for keyword search, | |
| and to some extent extractive QA purpose. The purpose of Reader is strictly to | |
| highlight the context for retrieved result and not for QA, however as stated | |
| it can work for QA too in limited sense. | |
| There are 4 variants of pipeline it can return | |
| 1.QueryCheck > Retriever > Reader | |
| 2.Retriever > Reader | |
| 3.QueryCheck > Retriever > Docs2Answers : If reader is None, | |
| then Doc2answer is used to keep the output of pipeline structurally same. | |
| 4.Retriever > Docs2Answers | |
| Links | |
| 1. https://docs.haystack.deepset.ai/docs/retriever#embedding-retrieval-recommended | |
| 2. https://www.sbert.net/examples/applications/semantic-search/README.html | |
| 3. https://github.com/deepset-ai/haystack/blob/main/haystack/nodes/retriever/dense.py | |
| 4. https://docs.haystack.deepset.ai/docs/reader | |
| Params | |
| ---------- | |
| documents: list of Haystack Documents, returned by preprocessig pipeline. | |
| embedding_model: Name of the model to be used for embedding. Check the links | |
| provided in documentation | |
| embedding_model_format: check the github link of Haystack provided in | |
| documentation | |
| embedding_layer: check the github link of Haystack provided in documentation | |
| embedding_dim: Document store has default value of embedding size = 768, and | |
| update_embeddings method of Docstore cannot infer the embedding size of | |
| retiever automatically, therefore set this value as per the model card. | |
| retriever_top_k: Number of Top results to be returned by retriever | |
| reader_model: Name of the model to be used for Reader node in hasyatck | |
| Pipeline. Check the links provided in documentation | |
| reader_top_k: Reader will use retrieved results to further find better matches. | |
| As purpose here is to use reader to extract context, the value is | |
| same as retriever_top_k. | |
| max_seq_len:everymodel has max seq len it can handle, check in model card. | |
| Needed to hanlde the edge cases | |
| useQueryCheck: Whether to use the querycheck which modifies the query or not. | |
| top_k_per_candidate:How many answers to extract for each candidate doc | |
| that is coming from the retriever | |
| Return | |
| --------- | |
| semanticsearch_pipeline: Haystack Pipeline object, with all the necessary | |
| nodes [QueryCheck, Retriever, Reader/Docs2Answer]. If reader is None, | |
| then Doc2answer is used to keep the output of pipeline structurally | |
| same. | |
| document_store: As retriever can work only with Haystack Document Store, the | |
| list of document returned by preprocessing pipeline are fed into to | |
| get InMemmoryDocumentStore object type, with retriever updating the | |
| embeddings of each paragraph in document store. | |
| """ | |
| document_store = createDocumentStore(documents=documents, | |
| embedding_dim=embedding_dim) | |
| retriever = loadRetriever(embedding_model = embedding_model, | |
| embedding_model_format=embedding_model_format, | |
| embedding_layer=embedding_layer, | |
| retriever_top_k= retriever_top_k, | |
| document_store = document_store, | |
| max_seq_len=max_seq_len) | |
| document_store.update_embeddings(retriever) | |
| semantic_search_pipeline = Pipeline() | |
| if useQueryCheck and reader_model: | |
| querycheck = QueryCheck() | |
| reader = FARMReader(model_name_or_path=reader_model, | |
| top_k = reader_top_k, use_gpu=True, | |
| top_k_per_candidate = top_k_per_candidate) | |
| semantic_search_pipeline.add_node(component = querycheck, | |
| name = "QueryCheck",inputs = ["Query"]) | |
| semantic_search_pipeline.add_node(component = retriever, | |
| name = "EmbeddingRetriever",inputs = ["QueryCheck.output_1"]) | |
| semantic_search_pipeline.add_node(component = reader, name = "FARMReader", | |
| inputs= ["EmbeddingRetriever"]) | |
| elif reader_model : | |
| reader = FARMReader(model_name_or_path=reader_model, | |
| top_k = reader_top_k, use_gpu=True, | |
| top_k_per_candidate = top_k_per_candidate) | |
| semantic_search_pipeline.add_node(component = retriever, | |
| name = "EmbeddingRetriever",inputs = ["Query"]) | |
| semantic_search_pipeline.add_node(component = reader, | |
| name = "FARMReader",inputs= ["EmbeddingRetriever"]) | |
| elif useQueryCheck and not reader_model: | |
| querycheck = QueryCheck() | |
| docs2answers = Docs2Answers() | |
| semantic_search_pipeline.add_node(component = querycheck, | |
| name = "QueryCheck",inputs = ["Query"]) | |
| semantic_search_pipeline.add_node(component = retriever, | |
| name = "EmbeddingRetriever",inputs = ["QueryCheck.output_1"]) | |
| semantic_search_pipeline.add_node(component = docs2answers, | |
| name = "Docs2Answers",inputs= ["EmbeddingRetriever"]) | |
| elif not useQueryCheck and not reader_model: | |
| docs2answers = Docs2Answers() | |
| semantic_search_pipeline.add_node(component = retriever, | |
| name = "EmbeddingRetriever",inputs = ["Query"]) | |
| semantic_search_pipeline.add_node(component = docs2answers, | |
| name = "Docs2Answers",inputs= ["EmbeddingRetriever"]) | |
| logging.info(semantic_search_pipeline.components) | |
| return semantic_search_pipeline, document_store | |
| def runSemanticPipeline(pipeline:Pipeline, queries:Union[list,str])->dict: | |
| """ | |
| will use the haystack run or run_batch based on if single query is passed | |
| as string or multiple queries as List[str] | |
| Params | |
| ------- | |
| pipeline: haystack pipeline, this is same as returned by semanticSearchPipeline | |
| from utils.semanticsearch | |
| queries: Either a single query or list of queries. | |
| Return | |
| ------- | |
| results: Dict containing answers and documents as key and their respective | |
| values | |
| """ | |
| if type(queries) == list: | |
| results = pipeline.run_batch(queries=queries) | |
| elif type(queries) == str: | |
| results = pipeline.run(query=queries) | |
| else: | |
| logging.info("Please check the input type for the queries") | |
| return | |
| return results | |
| def process_query_output(results:dict)->pd.DataFrame: | |
| """ | |
| Returns the dataframe with necessary information like including | |
| ['query','answer','answer_offset','context_offset','context','content', | |
| 'reader_score','retriever_score','id',]. This is designed for output given | |
| by semantic search pipeline with single query and final node as reader. | |
| The output of pipeline having Docs2Answers as final node or multiple queries | |
| need to be handled separately. In these other cases, use process_semantic_output | |
| from utils.semantic_search which uses this function internally to make one | |
| combined dataframe. | |
| Params | |
| --------- | |
| results: this dictionary should have key,values with | |
| keys = [query,answers,documents], however answers is optional. | |
| in case of [Doc2Answers as final node], process_semantic_output | |
| doesnt return answers thereby setting all values contained in | |
| answers to 'None' | |
| Return | |
| -------- | |
| df: dataframe with all the columns mentioned in function description. | |
| """ | |
| query_text = results['query'] | |
| if 'answers' in results.keys(): | |
| answer_dict = {} | |
| for answer in results['answers']: | |
| answer_dict[answer.document_id] = answer.to_dict() | |
| else: | |
| answer_dict = {} | |
| docs = results['documents'] | |
| df = pd.DataFrame(columns=['query','answer','answer_offset','context_offset', | |
| 'context','content','reader_score','retriever_score', | |
| 'id']) | |
| for doc in docs: | |
| row_list = {} | |
| row_list['query'] = query_text | |
| row_list['retriever_score'] = doc.score | |
| row_list['id'] = doc.id | |
| row_list['content'] = doc.content | |
| if doc.id in answer_dict.keys(): | |
| row_list['answer'] = answer_dict[doc.id]['answer'] | |
| row_list['context'] = answer_dict[doc.id]['context'] | |
| row_list['reader_score'] = answer_dict[doc.id]['score'] | |
| answer_offset = answer_dict[doc.id]['offsets_in_document'][0] | |
| row_list['answer_offset'] = [answer_offset['start'],answer_offset['end']] | |
| start_idx = doc.content.find(row_list['context']) | |
| end_idx = start_idx + len(row_list['context']) | |
| row_list['context_offset'] = [start_idx, end_idx] | |
| else: | |
| row_list['answer'] = None | |
| row_list['context'] = None | |
| row_list['reader_score'] = None | |
| row_list['answer_offset'] = None | |
| row_list['context_offset'] = None | |
| df_dictionary = pd.DataFrame([row_list]) | |
| df = pd.concat([df, df_dictionary], ignore_index=True) | |
| return df | |
| def process_semantic_output(results): | |
| """ | |
| Returns the dataframe with necessary information like including | |
| ['query','answer','answer_offset','context_offset','context','content', | |
| 'reader_score','retriever_score','id',]. Distingushes if its single query or | |
| multi queries by reading the pipeline output dictionary keys. | |
| Uses the process_query_output to get the dataframe for each query and create | |
| one concataneted dataframe. In case of Docs2Answers as final node, deletes | |
| the answers part. See documentations of process_query_output. | |
| Params | |
| --------- | |
| results: raw output of runSemanticPipeline. | |
| Return | |
| -------- | |
| df: dataframe with all the columns mentioned in function description. | |
| """ | |
| output = {} | |
| if 'query' in results.keys(): | |
| output['query'] = results['query'] | |
| output['documents'] = results['documents'] | |
| if results['node_id'] == 'Docs2Answers': | |
| pass | |
| else: | |
| output['answers'] = results['answers'] | |
| df = process_query_output(output) | |
| return df | |
| if 'queries' in results.keys(): | |
| df = pd.DataFrame(columns=['query','answer','answer_offset', | |
| 'context_offset','context','content', | |
| 'reader_score','retriever_score','id']) | |
| for query,answers,documents in zip(results['queries'], | |
| results['answers'],results['documents']): | |
| output = {} | |
| output['query'] = query | |
| output['documents'] = documents | |
| if results['node_id'] == 'Docs2Answers': | |
| pass | |
| else: | |
| output['answers'] = answers | |
| temp = process_query_output(output) | |
| df = pd.concat([df, temp], ignore_index=True) | |
| return df | |
| def semanticsearchAnnotator(matches:List[List[int]], document:Text): | |
| """ | |
| Annotates the text in the document defined by list of [start index, end index] | |
| Example: "How are you today", if document type is text, matches = [[0,3]] | |
| will give answer = "How", however in case we used the spacy matcher then the | |
| matches = [[0,3]] will give answer = "How are you". However if spacy is used | |
| to find "How" then the matches = [[0,1]] for the string defined above. | |
| """ | |
| start = 0 | |
| annotated_text = "" | |
| for match in matches: | |
| start_idx = match[0] | |
| end_idx = match[1] | |
| if check_streamlit(): | |
| annotated_text = (annotated_text + document[start:start_idx] | |
| + str(annotation(body=document[start_idx:end_idx], | |
| label="Context", background="#964448", color='#ffffff'))) | |
| else: | |
| annotated_text = (annotated_text + document[start:start_idx] | |
| + colored(document[start_idx:end_idx], | |
| "green", attrs = ['bold'])) | |
| start = end_idx | |
| annotated_text = annotated_text + document[end_idx:] | |
| if check_streamlit(): | |
| st.write( | |
| markdown(annotated_text), | |
| unsafe_allow_html=True, | |
| ) | |
| else: | |
| print(annotated_text) | |
| def semantic_keywordsearch(query:Text,documents:List[Document], | |
| embedding_model:Text, | |
| embedding_model_format:Text, | |
| embedding_layer:int, reader_model:str, | |
| retriever_top_k:int = 10, reader_top_k:int = 10, | |
| return_results:bool = False, embedding_dim:int = 768, | |
| max_seq_len:int = 512,top_k_per_candidate:int =1, | |
| sort_by:Literal["retriever", "reader"] = 'retriever'): | |
| """ | |
| Performs the Semantic search on the List of haystack documents which is | |
| returned by preprocessing Pipeline. | |
| Params | |
| ------- | |
| query: Keywords that need to be searche in documents. | |
| documents: List fo Haystack documents returned by preprocessing pipeline. | |
| """ | |
| semanticsearch_pipeline, doc_store = semanticSearchPipeline(documents = documents, | |
| embedding_model= embedding_model, | |
| embedding_layer= embedding_layer, | |
| embedding_model_format= embedding_model_format, | |
| reader_model= reader_model, retriever_top_k= retriever_top_k, | |
| reader_top_k= reader_top_k, embedding_dim=embedding_dim, | |
| max_seq_len=max_seq_len, | |
| top_k_per_candidate=top_k_per_candidate) | |
| raw_output = runSemanticPipeline(semanticsearch_pipeline,query) | |
| results_df = process_semantic_output(raw_output) | |
| if sort_by == 'retriever': | |
| results_df = results_df.sort_values(by=['retriever_score'], ascending=False) | |
| else: | |
| results_df = results_df.sort_values(by=['reader_score'], ascending=False) | |
| if return_results: | |
| return results_df | |
| else: | |
| if check_streamlit: | |
| st.markdown("##### Top few semantic search results #####") | |
| else: | |
| print("Top few semantic search results") | |
| for i in range(len(results_df)): | |
| if check_streamlit: | |
| st.write("Result {}".format(i+1)) | |
| else: | |
| print("Result {}".format(i+1)) | |
| semanticsearchAnnotator([results_df.loc[i]['context_offset']], | |
| results_df.loc[i]['content'] ) |