外观
文件读取&切片
约 1525 字大约 5 分钟
一、文档加载
1. 核心作用与流程
文档加载是RAG系统构建知识库的第一步,负责将多源异构数据转换为统一格式的文本内容。主要流程包括:
- 格式解析:支持PDF、Word、HTML、Markdown等20+格式,其中PDF需额外处理版面恢复和表格识别,Word需解析XML结构
- 内容提取:通过OCR技术处理扫描文档,利用NLP工具识别标题、段落、列表等逻辑元素
- 元数据管理:自动记录文档来源、创建时间、作者等信息,便于后续检索溯源
2. 技术挑战与解决方案
- 非结构化处理:PDF文档缺乏语义标签,需结合计算机视觉和NLP进行版面分析
- 多模态支持:处理含表格、公式的文档时,需保持结构化数据的关联性
- 编码兼容:针对不同语言的混合文档,采用UTF-8编码和自动检测机制
二、文本切片
1. 核心策略
(1) 固定尺寸切片
- 按预设字符数(如512 tokens)分割,通过滑动窗口保留上下文连贯性
- 适用于技术文档(800-1200词/块)和对话文本(400-600词/块)
(2) 语义切片
- 利用BERT等模型的NSP能力判断段落衔接性,通过相似度阈值动态合并
- 结合文章分析工具识别主从关系,保证每个切片主题完整
(3) 结构感知切片
- 基于Markdown/LaTeX标题层级划分,保留文档逻辑结构
- 专利方法构建文本树,以目录节点为起点,约束块字数
(4) 混合切片
- 递归切分:先按章节划分,超长部分二次分割并建立父子索引
- 动态调整:根据文本类型选择最优策略,如代码文件按函数分割
2. 优化技术
- 元数据增强:为切片添加摘要、问答对等辅助信息,提升检索精度
- 迟分技术(Late Chunking):先处理全文再生成块嵌入,保留跨块上下文
- 两阶段检索:粗筛后通过交叉编码器重排序,解决语义相似≠相关性问题
三、代码示例
文件读取&切片.py
import os
import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
import re
from bs4 import BeautifulSoup
from IPython.display import display, Code, Markdown
class ReadFiles:
"""
读取文件的类,用于从指定路径读取支持的文件类型(如 .txt、.md、.pdf)并进行内容分割。
"""
def __init__(self, path: str) -> None:
"""
初始化函数,设定要读取的文件路径,并获取该路径下所有符合要求的文件。
:param path: 文件夹路径
"""
self._path = path
self.file_list = self.get_files() # 获取文件列表
def get_files(self):
"""
遍历指定文件夹,获取支持的文件类型列表(txt, md, pdf)。
:return: 文件路径列表
"""
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 根据文件后缀筛选支持的文件类型
if filename.endswith(".md"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
"""
读取文件内容并进行分割,将长文本切分为多个块。
:param max_token_len: 每个文档片段的最大 Token 长度
:param cover_content: 在每个片段之间重叠的 Token 长度
:return: 切分后的文档片段列表
"""
docs = []
for file in self.file_list:
content = self.read_file_content(file) # 读取文件内容
# 分割文档为多个小块
chunk_content = self.get_chunk(content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
"""
将文档内容按最大 Token 长度进行切分。
:param text: 文档内容
:param max_token_len: 每个片段的最大 Token 长度
:param cover_content: 重叠的内容长度
:return: 切分后的文档片段列表
"""
chunk_text = []
curr_len = 0
curr_chunk = ''
token_len = max_token_len - cover_content
lines = text.splitlines() # 以换行符分割文本为行
for line in lines:
line = line.replace(' ', '') # 去除空格
enc = tiktoken.get_encoding("cl100k_base")
line_len = len(enc.encode(line)) # 计算当前行的 Token 长度
if line_len > max_token_len:
# 如果单行长度超过限制,将其分割为多个片段
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 防止跨单词分割
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
elif curr_len + line_len <= token_len:
# 当前片段长度未超过限制时,继续累加
curr_chunk += line + '\n'
curr_len += line_len + 1
else:
chunk_text.append(curr_chunk) # 保存当前片段
curr_chunk = curr_chunk[-cover_content:] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
"""
读取文件内容,根据文件类型选择不同的读取方式。
:param file_path: 文件路径
:return: 文件内容
"""
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
@classmethod
def read_pdf(cls, file_path: str):
"""
读取 PDF 文件内容。
:param file_path: PDF 文件路径
:return: PDF 文件中的文本内容
"""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
"""
读取 Markdown 文件内容,并将其转换为纯文本。
:param file_path: Markdown 文件路径
:return: 纯文本内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
# 使用 BeautifulSoup 从 HTML 中提取纯文本
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
"""
读取普通文本文件内容。
:param file_path: 文本文件路径
:return: 文件内容
"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
class Documents:
"""
文档类,用于读取已分好类的 JSON 格式文档。
"""
def __init__(self, path: str = '') -> None:
self.path = path
def get_content(self):
"""
读取 JSON 格式的文档内容。
:return: JSON 文档的内容
"""
with open(self.path, mode='r', encoding='utf-8') as f:
content = json.load(f)
return content
# 初始化 ReadFiles 类,指定文件目录路径
file_reader = ReadFiles(path="./data")
# 获取目录下所有支持的文件类型
file_list = file_reader.get_files()
print("支持的文件列表:", file_list)
# 将文件内容读取并分块
document_chunks = file_reader.get_content(max_token_len=600, cover_content=150)
# print("分块后的文档内容:", document_chunks)
print("第一块文档内容:", document_chunks[0])
更新日志
2025/6/26 11:30
查看所有更新日志
dfb81
-update于36bd4
-update于
版权所有
版权归属:NateHHX