外观
完整示例
约 2119 字大约 7 分钟
2025-04-07
import os
from openai import OpenAI
import matplotlib.pyplot as plt
import numpy as np
from typing import Dict, List, Optional, Tuple, Union
import ollama
import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
import re
from bs4 import BeautifulSoup
from IPython.display import display, Code, Markdown
class BaseEmbeddings:
"""
向量化的基类,用于将文本转换为向量表示。不同的子类可以实现不同的向量获取方法。
"""
def __init__(self, path: str, is_api: bool) -> None:
"""
初始化基类。
参数:
path (str) - 如果是本地模型,path 表示模型路径;如果是API模式,path可以为空
is_api (bool) - 表示是否使用API调用,如果为True表示通过API获取Embedding
"""
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str) -> List[float]:
"""
抽象方法,用于获取文本的向量表示,具体实现需要在子类中定义。
参数:
text (str) - 需要转换为向量的文本
model (str) - 所使用的模型名称
返回:
list[float] - 文本的向量表示
"""
raise NotImplementedError
@classmethod
def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
"""
计算两个向量之间的余弦相似度,用于衡量它们的相似程度。
参数:
vector1 (list[float]) - 第一个向量
vector2 (list[float]) - 第二个向量
返回:
float - 余弦相似度值,范围从 -1 到 1,越接近 1 表示向量越相似
"""
dot_product = np.dot(vector1, vector2) # 向量点积
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2) # 向量的模
if not magnitude:
return 0
return dot_product / magnitude # 计算余弦相似度
class OpenAIEmbedding(BaseEmbeddings):
"""
使用 OpenAI 的 Embedding API 来获取文本向量的类,继承自 BaseEmbeddings。
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
"""
初始化类,设置 OpenAI API 客户端,如果使用的是 API 调用。
参数:
path (str) - 本地模型的路径,使用API时可以为空
is_api (bool) - 是否通过 API 获取 Embedding,默认为 True
"""
super().__init__(path, is_api)
if self.is_api:
# 初始化 OpenAI API 客户端
from openai import OpenAI
self.client = OpenAI()
self.client.api_key = os.getenv("OPENAI_API_KEY") # 从环境变量中获取 API 密钥
self.client.base_url = os.getenv("OPENAI_BASE_URL") # 从环境变量中获取 API 基础URL
def get_embedding(self, text: str, model: str = "mxbai-embed-large") -> List[float]:
"""
使用 OpenAI 的 Embedding API 获取文本的向量表示。
参数:
text (str) - 需要转化为向量的文本
model (str) - 使用的 Embedding 模型名称,默认为 'mxbai-embed-large'
返回:
list[float] - 文本的向量表示
"""
if self.is_api:
# 去掉文本中的换行符,保证输入格式规范
text = text.replace("\n", " ")
# 调用 OpenAI API 获取文本的向量表示
return self.client.embeddings.create(input=[text], model=model).data[0].embedding
else:
return ollama.embeddings(model=model, prompt=text).embedding
class ReadFiles:
"""
读取文件的类,用于从指定路径读取支持的文件类型(如 .txt、.md、.pdf)并进行内容分割。
"""
def __init__(self, path: str) -> None:
"""
初始化函数,设定要读取的文件路径,并获取该路径下所有符合要求的文件。
:param path: 文件夹路径
"""
self._path = path
self.file_list = self.get_files() # 获取文件列表
def get_files(self):
"""
遍历指定文件夹,获取支持的文件类型列表(txt, md, pdf)。
:return: 文件路径列表
"""
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 根据文件后缀筛选支持的文件类型
if filename.endswith(".md"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
"""
读取文件内容并进行分割,将长文本切分为多个块。
:param max_token_len: 每个文档片段的最大 Token 长度
:param cover_content: 在每个片段之间重叠的 Token 长度
:return: 切分后的文档片段列表
"""
docs = []
for file in self.file_list:
content = self.read_file_content(file) # 读取文件内容
# 分割文档为多个小块
chunk_content = self.get_chunk(content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
"""
将文档内容按最大 Token 长度进行切分。
:param text: 文档内容
:param max_token_len: 每个片段的最大 Token 长度
:param cover_content: 重叠的内容长度
:return: 切分后的文档片段列表
"""
chunk_text = []
curr_len = 0
curr_chunk = ''
token_len = max_token_len - cover_content
lines = text.splitlines() # 以换行符分割文本为行
for line in lines:
line = line.replace(' ', '') # 去除空格
enc = tiktoken.get_encoding("cl100k_base")
line_len = len(enc.encode(line)) # 计算当前行的 Token 长度
if line_len > max_token_len:
# 如果单行长度超过限制,将其分割为多个片段
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 防止跨单词分割
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
elif curr_len + line_len <= token_len:
# 当前片段长度未超过限制时,继续累加
curr_chunk += line + '\n'
curr_len += line_len + 1
else:
chunk_text.append(curr_chunk) # 保存当前片段
curr_chunk = curr_chunk[-cover_content:] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
"""
读取文件内容,根据文件类型选择不同的读取方式。
:param file_path: 文件路径
:return: 文件内容
"""
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
@classmethod
def read_pdf(cls, file_path: str):
"""
读取 PDF 文件内容。
:param file_path: PDF 文件路径
:return: PDF 文件中的文本内容
"""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
"""
读取 Markdown 文件内容,并将其转换为纯文本。
:param file_path: Markdown 文件路径
:return: 纯文本内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
# 使用 BeautifulSoup 从 HTML 中提取纯文本
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
"""
读取普通文本文件内容。
:param file_path: 文本文件路径
:return: 文件内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
class Documents:
"""
文档类,用于读取已分好类的 JSON 格式文档。
"""
def __init__(self, path: str = '') -> None:
self.path = path
def get_content(self):
"""
读取 JSON 格式的文档内容。
:return: JSON 文档的内容
"""
with open(self.path, mode='r', encoding='utf-8') as f:
content = json.load(f)
return content
class VectorStore:
def __init__(self, document: List[str] = None) -> None:
"""
初始化向量存储类,存储文档和对应的向量表示。
:param document: 文档列表,默认为空。
"""
if document is None:
document = []
self.document = document # 存储文档内容
self.vectors = [] # 存储文档的向量表示
def get_vector(self, EmbeddingModel: BaseEmbeddings) -> List[List[float]]:
"""
使用传入的 Embedding 模型将文档向量化。
:param EmbeddingModel: 传入的用于生成向量的模型(需继承 BaseEmbeddings 类)。
:return: 返回文档对应的向量列表。
"""
# 遍历所有文档,获取每个文档的向量表示
self.vectors = [EmbeddingModel.get_embedding(doc) for doc in self.document]
return self.vectors
def persist(self, path: str = 'storage'):
"""
将文档和对应的向量表示持久化到本地目录中,以便后续加载使用。
:param path: 存储路径,默认为 'storage'。
"""
if not os.path.exists(path):
os.makedirs(path) # 如果路径不存在,创建路径
# 保存向量为 numpy 文件
np.save(os.path.join(path, 'vectors.npy'), self.vectors)
# 将文档内容存储到文本文件中
with open(os.path.join(path, 'documents.txt'), 'w', encoding='utf-8') as f:
for doc in self.document:
f.write(f"{doc}\n")
def load_vector(self, path: str = 'storage'):
"""
从本地加载之前保存的文档和向量数据。
:param path: 存储路径,默认为 'storage'。
"""
# 加载保存的向量数据
self.vectors = np.load(os.path.join(path, 'vectors.npy')).tolist()
# 加载文档内容
with open(os.path.join(path, 'documents.txt'), 'r', encoding='utf-8') as f:
self.document = [line.strip() for line in f.readlines()]
def get_similarity(self, vector1: List[float], vector2: List[float]) -> float:
"""
计算两个向量的余弦相似度。
:param vector1: 第一个向量。
:param vector2: 第二个向量。
:return: 返回两个向量的余弦相似度,范围从 -1 到 1。
"""
dot_product = np.dot(vector1, vector2)
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
if not magnitude:
return 0
return dot_product / magnitude
def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 3) -> List[str]:
"""
根据用户的查询文本,检索最相关的文档片段。
:param query: 用户的查询文本。
:param EmbeddingModel: 用于将查询向量化的嵌入模型。
:param k: 返回最相似的文档数量,默认为 1。
:return: 返回最相似的文档列表。
"""
# 将查询文本向量化
query_vector = EmbeddingModel.get_embedding(query)
# 计算查询向量与每个文档向量的相似度
similarities = [self.get_similarity(query_vector, vector) for vector in self.vectors]
# 获取相似度最高的 k 个文档索引
top_k_indices = np.argsort(similarities)[-k:][::-1]
# 返回对应的文档内容
return [self.document[idx] for idx in top_k_indices]
# 初始化 ReadFiles 类,指定文件目录路径
file_reader = ReadFiles(path="./data")
# 获取目录下所有支持的文件类型
file_list = file_reader.get_files()
print("支持的文件列表:", file_list)
# 将文件内容读取并分块
document_chunks = file_reader.get_content(max_token_len=600, cover_content=150)
# 创建向量数据库
vector_store = VectorStore(document=document_chunks)
# 使用 OpenAI Embedding 模型对文档进行向量化
embedding_model = OpenAIEmbedding("", False)
# 获取文档向量并存储
vector_store.get_vector(embedding_model)
# 持久化存储到本地
vector_store.persist('storage')
# 模拟用户查询
query = "人工智能是什么?"
print("问题:", query)
result = vector_store.query(query, embedding_model, 3)
print("检索结果:", result)
版权所有
版权归属:NateHHX