ragapp / text2vec.py
fmyaidha's picture
Upload 18 files
6b94ed1 verified
raw
history blame
6.61 kB
import os
import torch
from functional import seq
import numpy as np
import torch.nn.functional as F
from torch import cosine_similarity
from config import Config
from openai import OpenAI
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
class TextVector():
def __init__(self, cfg):
self.bert_path = cfg.bert_path
# 从配置文件读取API相关设置
self.use_api = getattr(cfg, 'use_api', True)
self.api_key = getattr(cfg, 'api_key', "sk-5b45aa67249a44d38abca3c02cc78a70")
self.base_url = getattr(cfg, 'base_url', "https://dashscope.aliyuncs.com/compatible-mode/v1")
self.model_name = getattr(cfg, 'model_name', "text-embedding-v3")
self.dimensions = getattr(cfg, 'dimensions', 1024)
self.batch_size = getattr(cfg, 'batch_size', 10)
# 只有在不使用API时才加载本地模型
if not self.use_api:
self.load_model()
def load_model(self):
"""载入模型"""
self.tokenizer = AutoTokenizer.from_pretrained(self.bert_path)
self.model = AutoModel.from_pretrained(self.bert_path)
def mean_pooling(self, model_output, attention_mask):
"""采用序列mean-pooling获得句子的表征向量"""
token_embeddings = model_output[0] # First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def get_vec(self, sentences):
"""通过模型获取句子的向量"""
if self.use_api:
# 如果使用API,重定向到API方法
return self.get_vec_api(sentences)
# 否则使用原始BERT方法
encoded_input = self.tokenizer(sentences, padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
model_output = self.model(**encoded_input)
sentence_embeddings = self.mean_pooling(model_output, encoded_input['attention_mask'])
sentence_embeddings = sentence_embeddings.data.cpu().numpy().tolist()
return sentence_embeddings
def get_vec_api(self, query, batch_size=None):
"""通过API获取句子的向量"""
if batch_size is None:
batch_size = self.batch_size
# 空查询检查
if not query:
print("Warning: Empty query provided to get_vec_api")
return []
client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
if isinstance(query, str):
query = [query]
# 移除空字符串和None值,确保输入数据有效
query = [q for q in query if q and isinstance(q, str) and q.strip()]
if not query:
print("Warning: No valid text to vectorize after filtering")
return []
all_vectors = []
retry_count = 0
max_retries = 2 # 允许重试几次
while retry_count <= max_retries and not all_vectors:
try:
for i in range(0, len(query), batch_size):
batch = query[i:i + batch_size]
try:
completion = client.embeddings.create(
model=self.model_name,
input=batch,
dimensions=self.dimensions,
encoding_format="float"
)
vectors = [embedding.embedding for embedding in completion.data]
all_vectors.extend(vectors)
except Exception as e:
print(f"向量化批次 {i//batch_size + 1} 失败:{str(e)}")
# 不立即返回空数组,继续处理其他批次
continue
# 检查是否有成功获取的向量
if all_vectors:
break
else:
retry_count += 1
print(f"未获取到任何向量,第 {retry_count} 次重试...")
except Exception as outer_e:
print(f"向量化过程中发生错误:{str(outer_e)}")
retry_count += 1
if retry_count <= max_retries:
print(f"第 {retry_count} 次重试...")
# 返回向量数组,如果仍然为空,确保返回一个正确形状的空数组
if not all_vectors and self.dimensions > 0:
print("Warning: 返回一个空的向量数组,形状为 [0, dimensions]")
return np.zeros((0, self.dimensions))
return all_vectors
def get_vec_batch(self, data, bs=None):
"""batch方式获取,提高效率"""
if bs is None:
bs = self.batch_size
if self.use_api:
# 如果使用API,直接调用API方法
vectors = self.get_vec_api(data, bs)
return torch.tensor(np.array(vectors)) if len(vectors) > 0 else torch.tensor(np.array([]))
# 否则使用原始BERT方法
data = seq(data).grouped(bs)
all_vectors = []
for batch in data:
vecs = self.get_vec(batch)
all_vectors.extend(vecs)
all_vectors = torch.tensor(np.array(all_vectors))
return all_vectors
def vector_similarity(self, vectors):
"""以[query,text1,text2...]来计算query与text1,text2,...的cosine相似度"""
# Add dimension checking to prevent errors
if vectors.size(0) <= 1:
print("Warning: Not enough vectors for similarity calculation")
return []
if len(vectors.shape) < 2:
print("Warning: Vectors must be 2-dimensional")
return []
vectors = F.normalize(vectors, p=2, dim=1)
q_vec = vectors[0,:]
o_vec = vectors[1:,:]
sim = cosine_similarity(q_vec, o_vec)
sim = sim.data.cpu().numpy().tolist()
return sim
# 修正函数名拼写错误:get_vec_bath -> get_vec_batch
cfg = Config()
tv = TextVector(cfg)
get_vector = tv.get_vec_batch # 修正名称
get_sim = tv.vector_similarity