import streamlit as st
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
import nltk
from transformers.models.roberta.modeling_roberta import *
from transformers import RobertaForQuestionAnswering
from nltk import word_tokenize
import json
import pandas as pd
# import re
import base64
# Set the background image
# background_image = """
#
# """
# st.markdown(background_image, unsafe_allow_html=True)
def set_bg_hack(main_bg):
'''
A function to unpack an image from root folder and set as bg.
Returns
-------
The background.
'''
# set bg name
main_bg_ext = "png"
st.markdown(
f"""
""",
unsafe_allow_html=True
)
set_bg_hack("Background.png")
image_url = "logo1.png"
# Hiển thị hình ảnh mà không có caption và điều chỉnh kích thước nhỏ lại
st.image(image_url, width=100)
# Download punkt for nltk
print("===================================================================")
@st.cache_data
def download_nltk_punkt():
nltk.download('punkt')
# Cache loading PhoBert model and tokenizer
@st.cache_data
def load_phoBert():
model = AutoModelForSequenceClassification.from_pretrained('minhdang14902/PhoBert_Edu')
tokenizer = AutoTokenizer.from_pretrained('minhdang14902/PhoBert_Edu')
return model, tokenizer
# Call the cached functions
download_nltk_punkt()
phoBert_model, phoBert_tokenizer = load_phoBert()
# Initialize the pipeline with the loaded PhoBert model and tokenizer
chatbot_pipeline = pipeline("sentiment-analysis", model=phoBert_model, tokenizer=phoBert_tokenizer)
# Load spaCy Vietnamese model
# nlp = spacy.load('vi_core_news_lg')
# Load intents from json file
def load_json_file(filename):
with open(filename) as f:
file = json.load(f)
return file
filename = './QA_Legal_converted_merged.json'
intents = load_json_file(filename)
def create_df():
df = pd.DataFrame({
'Pattern': [],
'Tag': []
})
return df
df = create_df()
def extract_json_info(json_file, df):
for intent in json_file['intents']:
for pattern in intent['patterns']:
sentence_tag = [pattern, intent['tag']]
df.loc[len(df.index)] = sentence_tag
return df
df = extract_json_info(intents, df)
df2 = df.copy()
labels = df2['Tag'].unique().tolist()
labels = [s.strip() for s in labels]
num_labels = len(labels)
id2label = {id: label for id, label in enumerate(labels)}
label2id = {label: id for id, label in enumerate(labels)}
# def tokenize_with_spacy(text):
# doc = nlp(text)
# tokens = [token.text for token in doc]
# tokenized_text = ' '.join(tokens)
# tokenized_text = re.sub(r'(? 0 else 0
align_matrix = align_matrix.to(context_embedding.device)
context_embedding_align = torch.bmm(align_matrix, context_embedding)
logits = self.qa_outputs(context_embedding_align)
start_logits, end_logits = logits.split(1, dim=-1)
start_logits = start_logits.squeeze(-1).contiguous()
end_logits = end_logits.squeeze(-1).contiguous()
total_loss = None
if start_positions is not None and end_positions is not None:
if len(start_positions.size()) > 1:
start_positions = start_positions.squeeze(-1)
if len(end_positions.size()) > 1:
end_positions = end_positions.squeeze(-1)
ignored_index = start_logits.size(1)
start_positions = start_positions.clamp(0, ignored_index)
end_positions = end_positions.clamp(0, ignored_index)
loss_fct = CrossEntropyLoss(ignore_index=ignored_index)
start_loss = loss_fct(start_logits, start_positions)
end_loss = loss_fct(end_logits, end_positions)
total_loss = (start_loss + end_loss) / 2
if not return_dict:
output = (start_logits, end_logits) + outputs[2:]
return ((total_loss,) + output) if total_loss is not None else output
return QuestionAnsweringModelOutput(
loss=total_loss,
start_logits=start_logits,
end_logits=end_logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
# roberta_model_checkpoint = "minhdang14902/Roberta_edu"
# roberta_tokenizer = AutoTokenizer.from_pretrained(roberta_model_checkpoint)
# roberta_model = MRCQuestionAnswering.from_pretrained(roberta_model_checkpoint)
# Cache loading Roberta model and tokenizer
@st.cache_data
def load_roberta_model():
model = MRCQuestionAnswering.from_pretrained('minhdang14902/Roberta_edu')
tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Roberta_edu')
return model, tokenizer
roberta_model, roberta_tokenizer = load_roberta_model()
def chatRoberta(text):
label = label2id[chatbot_pipeline(text)[0]['label']]
response = intents['intents'][label]['responses']
print(response[0])
QA_input = {
'question': text,
'context': response[0]
}
# Tokenize input
encoded_input = tokenize_function(QA_input, roberta_tokenizer)
# Prepare batch samples
batch_samples = data_collator([encoded_input], roberta_tokenizer)
# Model prediction
roberta_model.eval()
with torch.no_grad():
inputs = {
'input_ids': batch_samples['input_ids'],
'attention_mask': batch_samples['attention_mask'],
'words_lengths': batch_samples['words_lengths'],
}
outputs = roberta_model(**inputs)
# Extract answer
result = extract_answer([encoded_input], outputs, roberta_tokenizer)
context = response[0]
return result, context
def tokenize_function(example, tokenizer):
question_word = word_tokenize(example["question"])
context_word = word_tokenize(example["context"])
question_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in question_word]
context_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in context_word]
valid = True
if len([j for i in question_sub_words_ids + context_sub_words_ids for j in i]) > tokenizer.model_max_length - 1:
valid = False
question_sub_words_ids = [[tokenizer.bos_token_id]] + question_sub_words_ids + [[tokenizer.eos_token_id]]
context_sub_words_ids = context_sub_words_ids + [[tokenizer.eos_token_id]]
input_ids = [j for i in question_sub_words_ids + context_sub_words_ids for j in i]
if len(input_ids) > tokenizer.model_max_length:
valid = False
words_lengths = [len(item) for item in question_sub_words_ids + context_sub_words_ids]
return {
"input_ids": input_ids,
"words_lengths": words_lengths,
"valid": valid
}
def data_collator(samples, tokenizer):
if len(samples) == 0:
return {}
def collate_tokens(values, pad_idx, eos_idx=None, left_pad=False, move_eos_to_beginning=False):
size = max(v.size(0) for v in values)
res = values[0].new(len(values), size).fill_(pad_idx)
def copy_tensor(src, dst):
assert dst.numel() == src.numel()
if move_eos_to_beginning:
assert src[-1] == eos_idx
dst[0] = eos_idx
dst[1:] = src[:-1]
else:
dst.copy_(src)
for i, v in enumerate(values):
copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)])
return res
input_ids = collate_tokens([torch.tensor(item['input_ids']) for item in samples], pad_idx=tokenizer.pad_token_id)
attention_mask = torch.zeros_like(input_ids)
for i in range(len(samples)):
attention_mask[i][:len(samples[i]['input_ids'])] = 1
words_lengths = collate_tokens([torch.tensor(item['words_lengths']) for item in samples], pad_idx=0)
batch_samples = {
'input_ids': input_ids,
'attention_mask': attention_mask,
'words_lengths': words_lengths,
}
return batch_samples
def extract_answer(inputs, outputs, tokenizer):
plain_result = []
for sample_input, start_logit, end_logit in zip(inputs, outputs.start_logits, outputs.end_logits):
sample_words_length = sample_input['words_lengths']
input_ids = sample_input['input_ids']
answer_start = sum(sample_words_length[:torch.argmax(start_logit)])
answer_end = sum(sample_words_length[:torch.argmax(end_logit) + 1])
if answer_start <= answer_end:
answer = tokenizer.convert_tokens_to_string(
tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))
if answer == tokenizer.bos_token:
answer = ''
else:
answer = ''
score_start = torch.max(torch.softmax(start_logit, dim=-1)).cpu().detach().numpy().tolist()
score_end = torch.max(torch.softmax(end_logit, dim=-1)).cpu().detach().numpy().tolist()
plain_result.append({
"answer": answer,
"score_start": score_start,
"score_end": score_end
})
return plain_result
st.title("Chatbot HUIT")
st.write("Hi! Tôi là trợ lý của bạn trong việc trả lời các câu hỏi về trường HUIT. Nếu câu trả lời trống trơn, đừng lo, chỉ là hệ thống không thấy câu trả lời phù hợp!!")
text = st.text_input("User: ", key="input")
if 'chat_history' not in st.session_state:
st.session_state['chat_history'] = []
# prompt = st.chat_input("Hãy chat gì đó!")
# if prompt:
# result = chatRoberta(text)
# st.write(f"HUIT Chatbot: {result[0]['answer']}")
# if st.button("Chat!"):
# if text:
# result = chatRoberta(text)
# st.write(f"Chatbot: {result[0]['answer']}")
# else:
# st.write("Hãy chat gì đó!")
def get_response(text):
st.subheader("The Answer is:")
st.write(text)
answer, context = chatRoberta(text)
result = answer[0]['answer']
if result == "":
return context
return result
if st.button("Chat!"):
st.session_state['chat_history'].append(("User", text))
response = get_response(text)
st.subheader("The Response is:")
message = st.empty()
result = ""
for chunk in response:
result += chunk
message.markdown(result + "❚ ")
message.markdown(result)
st.session_state['chat_history'].append(("Bot", result))
for i, (sender, message) in enumerate(st.session_state['chat_history']):
if sender == "User":
st.text_area(f"User:", value=message, height=100, max_chars=None, key=f"user_{i}")
else:
st.text_area(f"Bot:", value=message, height=100, max_chars=None, key=f"bot_{i}")