外观
文件读取&切片
约 1517 字大约 5 分钟
2025-04-07
一、文档加载
1. 核心作用与流程
文档加载是RAG系统构建知识库的第一步,负责将多源异构数据转换为统一格式的文本内容。主要流程包括:
- 格式解析:支持PDF、Word、HTML、Markdown等20+格式,其中PDF需额外处理版面恢复和表格识别,Word需解析XML结构
- 内容提取:通过OCR技术处理扫描文档,利用NLP工具识别标题、段落、列表等逻辑元素
- 元数据管理:自动记录文档来源、创建时间、作者等信息,便于后续检索溯源
2. 技术挑战与解决方案
- 非结构化处理:PDF文档缺乏语义标签,需结合计算机视觉和NLP进行版面分析
- 多模态支持:处理含表格、公式的文档时,需保持结构化数据的关联性
- 编码兼容:针对不同语言的混合文档,采用UTF-8编码和自动检测机制
二、文本切片
1. 核心策略
(1) 固定尺寸切片
- 按预设字符数(如512 tokens)分割,通过滑动窗口保留上下文连贯性
- 适用于技术文档(800-1200词/块)和对话文本(400-600词/块)
(2) 语义切片
- 利用BERT等模型的NSP能力判断段落衔接性,通过相似度阈值动态合并
- 结合文章分析工具识别主从关系,保证每个切片主题完整
(3) 结构感知切片
- 基于Markdown/LaTeX标题层级划分,保留文档逻辑结构
- 专利方法构建文本树,以目录节点为起点,约束块字数
(4) 混合切片
- 递归切分:先按章节划分,超长部分二次分割并建立父子索引
- 动态调整:根据文本类型选择最优策略,如代码文件按函数分割
2. 优化技术
- 元数据增强:为切片添加摘要、问答对等辅助信息,提升检索精度
- 迟分技术(Late Chunking):先处理全文再生成块嵌入,保留跨块上下文
- 两阶段检索:粗筛后通过交叉编码器重排序,解决语义相似≠相关性问题
三、代码示例
import os
import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
import re
from bs4 import BeautifulSoup
from IPython.display import display, Code, Markdown
class ReadFiles:
"""
读取文件的类,用于从指定路径读取支持的文件类型(如 .txt、.md、.pdf)并进行内容分割。
"""
def __init__(self, path: str) -> None:
"""
初始化函数,设定要读取的文件路径,并获取该路径下所有符合要求的文件。
:param path: 文件夹路径
"""
self._path = path
self.file_list = self.get_files() # 获取文件列表
def get_files(self):
"""
遍历指定文件夹,获取支持的文件类型列表(txt, md, pdf)。
:return: 文件路径列表
"""
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 根据文件后缀筛选支持的文件类型
if filename.endswith(".md"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
"""
读取文件内容并进行分割,将长文本切分为多个块。
:param max_token_len: 每个文档片段的最大 Token 长度
:param cover_content: 在每个片段之间重叠的 Token 长度
:return: 切分后的文档片段列表
"""
docs = []
for file in self.file_list:
content = self.read_file_content(file) # 读取文件内容
# 分割文档为多个小块
chunk_content = self.get_chunk(content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
"""
将文档内容按最大 Token 长度进行切分。
:param text: 文档内容
:param max_token_len: 每个片段的最大 Token 长度
:param cover_content: 重叠的内容长度
:return: 切分后的文档片段列表
"""
chunk_text = []
curr_len = 0
curr_chunk = ''
token_len = max_token_len - cover_content
lines = text.splitlines() # 以换行符分割文本为行
for line in lines:
line = line.replace(' ', '') # 去除空格
enc = tiktoken.get_encoding("cl100k_base")
line_len = len(enc.encode(line)) # 计算当前行的 Token 长度
if line_len > max_token_len:
# 如果单行长度超过限制,将其分割为多个片段
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 防止跨单词分割
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
elif curr_len + line_len <= token_len:
# 当前片段长度未超过限制时,继续累加
curr_chunk += line + '\n'
curr_len += line_len + 1
else:
chunk_text.append(curr_chunk) # 保存当前片段
curr_chunk = curr_chunk[-cover_content:] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
"""
读取文件内容,根据文件类型选择不同的读取方式。
:param file_path: 文件路径
:return: 文件内容
"""
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
@classmethod
def read_pdf(cls, file_path: str):
"""
读取 PDF 文件内容。
:param file_path: PDF 文件路径
:return: PDF 文件中的文本内容
"""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
"""
读取 Markdown 文件内容,并将其转换为纯文本。
:param file_path: Markdown 文件路径
:return: 纯文本内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
# 使用 BeautifulSoup 从 HTML 中提取纯文本
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
"""
读取普通文本文件内容。
:param file_path: 文本文件路径
:return: 文件内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
class Documents:
"""
文档类,用于读取已分好类的 JSON 格式文档。
"""
def __init__(self, path: str = '') -> None:
self.path = path
def get_content(self):
"""
读取 JSON 格式的文档内容。
:return: JSON 文档的内容
"""
with open(self.path, mode='r', encoding='utf-8') as f:
content = json.load(f)
return content
# 初始化 ReadFiles 类,指定文件目录路径
file_reader = ReadFiles(path="./data")
# 获取目录下所有支持的文件类型
file_list = file_reader.get_files()
print("支持的文件列表:", file_list)
# 将文件内容读取并分块
document_chunks = file_reader.get_content(max_token_len=600, cover_content=150)
# print("分块后的文档内容:", document_chunks)
print("第一块文档内容:", document_chunks[0])
版权所有
版权归属:NateHHX