File size: 6,612 Bytes
6b94ed1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import os
import torch
from functional import seq
import numpy as np
import torch.nn.functional as F
from torch import cosine_similarity
from config import Config
from openai import OpenAI
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
class TextVector():
def __init__(self, cfg):
self.bert_path = cfg.bert_path
# 从配置文件读取API相关设置
self.use_api = getattr(cfg, 'use_api', True)
self.api_key = getattr(cfg, 'api_key', "sk-5b45aa67249a44d38abca3c02cc78a70")
self.base_url = getattr(cfg, 'base_url', "https://dashscope.aliyuncs.com/compatible-mode/v1")
self.model_name = getattr(cfg, 'model_name', "text-embedding-v3")
self.dimensions = getattr(cfg, 'dimensions', 1024)
self.batch_size = getattr(cfg, 'batch_size', 10)
# 只有在不使用API时才加载本地模型
if not self.use_api:
self.load_model()
def load_model(self):
"""载入模型"""
self.tokenizer = AutoTokenizer.from_pretrained(self.bert_path)
self.model = AutoModel.from_pretrained(self.bert_path)
def mean_pooling(self, model_output, attention_mask):
"""采用序列mean-pooling获得句子的表征向量"""
token_embeddings = model_output[0] # First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def get_vec(self, sentences):
"""通过模型获取句子的向量"""
if self.use_api:
# 如果使用API,重定向到API方法
return self.get_vec_api(sentences)
# 否则使用原始BERT方法
encoded_input = self.tokenizer(sentences, padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
model_output = self.model(**encoded_input)
sentence_embeddings = self.mean_pooling(model_output, encoded_input['attention_mask'])
sentence_embeddings = sentence_embeddings.data.cpu().numpy().tolist()
return sentence_embeddings
def get_vec_api(self, query, batch_size=None):
"""通过API获取句子的向量"""
if batch_size is None:
batch_size = self.batch_size
# 空查询检查
if not query:
print("Warning: Empty query provided to get_vec_api")
return []
client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
if isinstance(query, str):
query = [query]
# 移除空字符串和None值,确保输入数据有效
query = [q for q in query if q and isinstance(q, str) and q.strip()]
if not query:
print("Warning: No valid text to vectorize after filtering")
return []
all_vectors = []
retry_count = 0
max_retries = 2 # 允许重试几次
while retry_count <= max_retries and not all_vectors:
try:
for i in range(0, len(query), batch_size):
batch = query[i:i + batch_size]
try:
completion = client.embeddings.create(
model=self.model_name,
input=batch,
dimensions=self.dimensions,
encoding_format="float"
)
vectors = [embedding.embedding for embedding in completion.data]
all_vectors.extend(vectors)
except Exception as e:
print(f"向量化批次 {i//batch_size + 1} 失败:{str(e)}")
# 不立即返回空数组,继续处理其他批次
continue
# 检查是否有成功获取的向量
if all_vectors:
break
else:
retry_count += 1
print(f"未获取到任何向量,第 {retry_count} 次重试...")
except Exception as outer_e:
print(f"向量化过程中发生错误:{str(outer_e)}")
retry_count += 1
if retry_count <= max_retries:
print(f"第 {retry_count} 次重试...")
# 返回向量数组,如果仍然为空,确保返回一个正确形状的空数组
if not all_vectors and self.dimensions > 0:
print("Warning: 返回一个空的向量数组,形状为 [0, dimensions]")
return np.zeros((0, self.dimensions))
return all_vectors
def get_vec_batch(self, data, bs=None):
"""batch方式获取,提高效率"""
if bs is None:
bs = self.batch_size
if self.use_api:
# 如果使用API,直接调用API方法
vectors = self.get_vec_api(data, bs)
return torch.tensor(np.array(vectors)) if len(vectors) > 0 else torch.tensor(np.array([]))
# 否则使用原始BERT方法
data = seq(data).grouped(bs)
all_vectors = []
for batch in data:
vecs = self.get_vec(batch)
all_vectors.extend(vecs)
all_vectors = torch.tensor(np.array(all_vectors))
return all_vectors
def vector_similarity(self, vectors):
"""以[query,text1,text2...]来计算query与text1,text2,...的cosine相似度"""
# Add dimension checking to prevent errors
if vectors.size(0) <= 1:
print("Warning: Not enough vectors for similarity calculation")
return []
if len(vectors.shape) < 2:
print("Warning: Vectors must be 2-dimensional")
return []
vectors = F.normalize(vectors, p=2, dim=1)
q_vec = vectors[0,:]
o_vec = vectors[1:,:]
sim = cosine_similarity(q_vec, o_vec)
sim = sim.data.cpu().numpy().tolist()
return sim
# 修正函数名拼写错误:get_vec_bath -> get_vec_batch
cfg = Config()
tv = TextVector(cfg)
get_vector = tv.get_vec_batch # 修正名称
get_sim = tv.vector_similarity |