外观
训练数据预处理流水线
约 4025 字大约 13 分钟
设计思路与核心概念
1. 训练数据预处理的背景与动机
训练数据预处理流水线是大语言模型训练的基础设施,主要解决以下关键问题:
- 数据格式转换:将原始文本转换为模型可处理的数字序列
- 序列切分:将长文本切分为固定长度的训练样本
- 批量处理:高效地组织数据进行批量训练
- 内存管理:在有限内存下处理大规模文本数据
- 训练效率:通过并行加载和预处理提高训练速度
2. 核心设计思想
训练数据预处理流水线的核心思想是构建高效的数据管道,将原始文本转换为模型训练所需的批量张量:
- 滑动窗口采样:通过滑动窗口从长文本中提取固定长度的训练样本
- 自回归目标:构造下一个token预测任务的输入-目标对
- 批量化处理:将多个样本组织成批次,提高GPU利用率
- 流式处理:支持大规模数据的流式加载和处理
3. 技术架构
数据流水线组成
原始文本 → 分词编码 → 滑动窗口切分 → 样本对构造 → 批量组织 → 嵌入转换 → 模型输入
核心组件
- GPTDatasetV1类:自定义数据集,实现滑动窗口采样
- DataLoader:PyTorch数据加载器,支持批量处理和并行加载
- 分词器:tiktoken GPT-2分词器,将文本转换为token序列
- 嵌入层:词嵌入和位置嵌入,将token转换为向量表示
4. 数学原理
滑动窗口采样
对于长度为N的token序列,窗口大小为L,步长为S:
样本数量=⌊SN−L⌋+1
每个样本i的输入和目标:
inputi=tokens[i×S:i×S+L]
targeti=tokens[i×S+1:i×S+L+1]
嵌入表示
最终的输入表示为词嵌入和位置嵌入的和:
H=Etoken+Epos
其中:
- Etoken∈RB×L×d:词嵌入矩阵
- Epos∈RL×d:位置嵌入矩阵(广播到批次维度)
执行流程
1. 整体执行流程图
2. 详细计算流程图
处理步骤详解
步骤 | 操作描述 | 核心方法 | 输入示例 | 输出示例 |
---|---|---|---|---|
1 | 文本读取 | open().read() | "the-verdict.txt" | 完整文本字符串 |
2 | 分词编码 | tokenizer.encode() | "Hello world" | [15496, 995] |
3 | 滑动窗口 | range(0, len-max_len, stride) | 步长=4, 窗口=4 | 多个窗口位置 |
4 | 样本构造 | tokens[i:i+max_len] | 窗口位置i | 输入序列 |
5 | 目标构造 | tokens[i+1:i+max_len+1] | 窗口位置i+1 | 目标序列 |
6 | 批量组织 | DataLoader | 8个样本 | (8,4)批次张量 |
7 | 词嵌入 | embedding_layer(inputs) | token IDs | (8,4,256)张量 |
8 | 位置嵌入 | pos_embedding + token_emb | 位置索引 | 最终表示 |
完整代码实现
训练数据预处理流水线.py
# -------------------------- 数据预处理模块 --------------------------
# 导入OpenAI官方分词库,支持GPT系列模型的快速分词
import tiktoken
# PyTorch核心库和数据集工具
import torch
from torch.utils.data import Dataset, DataLoader
# 自定义GPT训练数据集类(继承PyTorch Dataset)
class GPTDatasetV1(Dataset):
def __init__(self, txt, tokenizer, max_length, stride):
# 存储输入序列和目标序列的列表(每个元素为LongTensor)
self.input_ids = []
self.target_ids = []
# 将原始文本转换为token ID序列(例如:"hello" -> [31373])
token_ids = tokenizer.encode(txt)
# 滑动窗口采样:以stride为步长,生成连续的训练样本
# 例如:max_length=512时,每个样本包含512个token作为输入和对应的目标
for i in range(0, len(token_ids) - max_length, stride):
# 输入窗口:从i开始取max_length个token
input_chunk = token_ids[i:i + max_length]
# 目标窗口:相对输入窗口后移1位(实现next-token预测任务)
target_chunk = token_ids[i + 1: i + max_length + 1]
# 转换为Tensor并存储(后续自动批处理需要Tensor格式)
self.input_ids.append(torch.tensor(input_chunk, dtype=torch.long))
self.target_ids.append(torch.tensor(target_chunk, dtype=torch.long))
# 返回数据集总样本数(用于自动分批)
def __len__(self):
return len(self.input_ids)
# 获取单个训练样本(输入序列 + 目标序列)
def __getitem__(self, idx):
return self.input_ids[idx], self.target_ids[idx] # 返回元组形式
# -------------------------- 数据流水线构建 --------------------------
def create_dataloader_v1(txt, batch_size=4, max_length=256, stride=128,
shuffle=True, drop_last=True, num_workers=0):
# 初始化GPT-2官方分词器(适配模型预训练时的分词方式)
tokenizer = tiktoken.get_encoding("gpt2")
# 实例化数据集(完成滑动窗口切分和token转换)
dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)
# 构造高效数据加载器(支持多线程加载和自动批处理)
dataloader = DataLoader(
dataset,
batch_size=batch_size, # 每批样本数(影响内存占用和并行效率)
shuffle=shuffle, # 是否随机打乱(训练时建议开启)
drop_last=drop_last, # 丢弃最后不完整的批次(保持批次形状一致)
num_workers=num_workers # 数据预加载线程数(加速大规模数据集处理)
)
return dataloader
# -------------------------- 数据加载 --------------------------
# 读取原始文本文件(假设文本为英文小说或文章)
with open("the-verdict.txt", "r", encoding="utf-8") as f:
raw_text = f.read() # 获取完整文本内容字符串
# 设置模型上下文窗口长度(即最大输入token数)
max_length = 4 # 示例使用小窗口便于调试,实际值通常为512/1024等
# 构建数据加载器(演示配置)
dataloader = create_dataloader_v1(
raw_text,
batch_size=8, # 每个批次包含8个独立样本序列
max_length=max_length, # 每个样本序列长度为4个token
stride=max_length, # 窗口滑动步长等于窗口长度(样本无重叠)
shuffle=False # 禁止打乱顺序(便于调试观察原始样本)
)
# -------------------------- 神经网络模块 --------------------------
# 创建数据迭代器(用于遍历数据批次)
data_iter = iter(dataloader)
# 获取第一个批次的数据(输入序列和目标序列)
inputs, targets = next(data_iter)
# 设置GPT模型的词汇表大小(GPT-2标准配置)
vocab_size = 50257 # 包含特殊token(如<|endoftext|>)的词汇总量
# 定义词向量维度(影响模型容量和计算复杂度)
output_dim = 256 # 每个token映射到256维的连续向量空间
# 构建可训练的词嵌入矩阵(核心组件:将离散token映射为连续向量)
token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
"""
数学形式:
embeddings = token_embeddings(input_ids) # (batch, seq_len) -> (batch, seq_len, dim)
其中:
input_ids中的每个整数索引对应嵌入矩阵中的一行向量
"""
# 将输入token转换为词嵌入向量
token_embeddings = token_embedding_layer(inputs)
print(f"词嵌入张量形状: {token_embeddings.shape}") # 预期 (8,4,256)
# 定义位置编码层(为序列中的每个位置生成嵌入向量)
context_length = max_length # 位置编码长度需匹配输入序列长度
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)
# 生成位置嵌入(为0到context_length-1的位置生成向量)
pos_embeddings = pos_embedding_layer(torch.arange(context_length))
print(f"位置嵌入形状: {pos_embeddings.shape}") # (4,256) 可广播到每个批次
# 融合词嵌入与位置信息(Transformer的经典输入处理方式)
input_embeddings = token_embeddings + pos_embeddings # 逐元素相加
print(f"最终输入表示形状: {input_embeddings.shape}") # 保持 (8,4,256)
代码详细解析
1. GPTDatasetV1类设计
类架构分析
class GPTDatasetV1(Dataset):
def __init__(self, txt, tokenizer, max_length, stride):
# 数据预处理和存储
def __len__(self):
# 返回数据集大小
def __getitem__(self, idx):
# 获取单个样本
设计要点:
- 继承Dataset:符合PyTorch数据处理规范,支持自动批处理
- 滑动窗口:通过stride参数控制样本重叠程度
- 内存预加载:将所有样本预先处理并存储在内存中
- 张量格式:直接存储为torch.Tensor,避免运行时转换开销
核心参数分析
参数 | 类型 | 作用 | 典型值 |
---|---|---|---|
txt | str | 原始训练文本 | 完整文档内容 |
tokenizer | Encoding | 分词器对象 | tiktoken GPT-2 |
max_length | int | 序列最大长度 | 512, 1024, 2048 |
stride | int | 滑动窗口步长 | max_length//2 |
2. 滑动窗口采样机制
窗口切分逻辑
for i in range(0, len(token_ids) - max_length, stride):
input_chunk = token_ids[i:i + max_length]
target_chunk = token_ids[i + 1: i + max_length + 1]
切分策略:
- 起始位置:从0开始,每次移动stride个位置
- 终止条件:确保每个窗口都有完整的max_length个token
- 目标偏移:目标序列相对输入序列右移1位,实现next-token预测
窗口重叠示例
# 假设token_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# max_length = 4, stride = 2
# 样本1: input=[1,2,3,4], target=[2,3,4,5]
# 样本2: input=[3,4,5,6], target=[4,5,6,7]
# 样本3: input=[5,6,7,8], target=[6,7,8,9]
# 样本4: input=[7,8,9,10], target=[8,9,10,?] # 需要足够的token
重叠优势:
- 数据增强:从有限文本中生成更多训练样本
- 上下文连续性:保持文本的局部连续性
- 训练稳定性:提供更多的梯度更新机会
3. DataLoader配置详解
create_dataloader_v1函数分析
def create_dataloader_v1(txt, batch_size=4, max_length=256, stride=128,
shuffle=True, drop_last=True, num_workers=0):
参数配置策略:
batch_size
:影响内存使用和梯度稳定性shuffle
:训练时开启,验证时关闭drop_last
:保持批次大小一致,避免最后一个小批次num_workers
:多进程加载,加速大数据集处理
DataLoader高级配置
# 内存优化配置
dataloader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
pin_memory=True, # 加速GPU传输
persistent_workers=True, # 保持worker进程
prefetch_factor=2 # 预取批次数量
)
4. 嵌入层实现详解
词嵌入层设计
token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
参数规模:
- 权重矩阵:(50257, 256) = 12.87M参数
- 内存占用:约51MB(float32)
- 查找复杂度:O(1),直接索引访问
位置嵌入层设计
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)
设计特点:
- 学习式位置编码:相比固定正弦编码更灵活
- 上下文长度限制:最大支持context_length个位置
- 广播机制:自动广播到批次维度
5. 数据流转分析
数据形状变换
# 原始文本 -> 分词
raw_text: str -> token_ids: List[int]
# 滑动窗口 -> 样本对
token_ids: [N] -> samples: [(max_length,), (max_length,)] * num_samples
# 批量组织 -> 批次张量
samples: List[Tuple] -> batch: (batch_size, max_length)
# 嵌入转换 -> 向量表示
batch: (batch_size, max_length) -> embeddings: (batch_size, max_length, output_dim)
内存使用分析
def estimate_memory_usage(vocab_size, output_dim, batch_size, max_length):
"""估算内存使用量"""
# 词嵌入层
token_emb_params = vocab_size * output_dim * 4 # bytes (float32)
# 位置嵌入层
pos_emb_params = max_length * output_dim * 4
# 批次数据
batch_data = batch_size * max_length * 4 # input ids
batch_embeddings = batch_size * max_length * output_dim * 4
total_mb = (token_emb_params + pos_emb_params + batch_data + batch_embeddings) / (1024**2)
return {
"token_embedding_mb": token_emb_params / (1024**2),
"pos_embedding_mb": pos_emb_params / (1024**2),
"batch_data_mb": batch_data / (1024**2),
"batch_embeddings_mb": batch_embeddings / (1024**2),
"total_mb": total_mb
}
高级特性与扩展
1. 动态批处理优化
变长序列处理
class DynamicGPTDataset(Dataset):
"""支持变长序列的动态数据集"""
def __init__(self, texts, tokenizer, max_length):
self.samples = []
for text in texts:
tokens = tokenizer.encode(text)
# 截断或填充到max_length
if len(tokens) > max_length:
tokens = tokens[:max_length]
else:
tokens.extend([tokenizer.eot_token] * (max_length - len(tokens)))
self.samples.append(tokens)
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
tokens = self.samples[idx]
return torch.tensor(tokens[:-1]), torch.tensor(tokens[1:])
智能批处理
def collate_fn(batch):
"""自定义批处理函数,支持动态填充"""
inputs, targets = zip(*batch)
# 找到批次中的最大长度
max_len = max(len(seq) for seq in inputs)
# 填充到相同长度
padded_inputs = []
padded_targets = []
for inp, tgt in zip(inputs, targets):
pad_len = max_len - len(inp)
padded_inputs.append(torch.cat([inp, torch.zeros(pad_len, dtype=torch.long)]))
padded_targets.append(torch.cat([tgt, torch.zeros(pad_len, dtype=torch.long)]))
return torch.stack(padded_inputs), torch.stack(padded_targets)
2. 大规模数据处理
流式数据加载
class StreamingGPTDataset(Dataset):
"""流式数据集,支持大规模文本处理"""
def __init__(self, file_paths, tokenizer, max_length, stride):
self.file_paths = file_paths
self.tokenizer = tokenizer
self.max_length = max_length
self.stride = stride
self._build_index()
def _build_index(self):
"""构建文件索引,避免全部加载到内存"""
self.file_index = []
for file_path in self.file_paths:
with open(file_path, 'r') as f:
text = f.read()
tokens = self.tokenizer.encode(text)
num_samples = (len(tokens) - self.max_length) // self.stride + 1
for i in range(num_samples):
self.file_index.append((file_path, i * self.stride))
def __len__(self):
return len(self.file_index)
def __getitem__(self, idx):
file_path, start_pos = self.file_index[idx]
# 动态加载文件片段
with open(file_path, 'r') as f:
text = f.read()
tokens = self.tokenizer.encode(text)
input_tokens = tokens[start_pos:start_pos + self.max_length]
target_tokens = tokens[start_pos + 1:start_pos + self.max_length + 1]
return torch.tensor(input_tokens), torch.tensor(target_tokens)
分布式数据处理
from torch.utils.data.distributed import DistributedSampler
def create_distributed_dataloader(dataset, batch_size, num_replicas, rank):
"""创建分布式数据加载器"""
sampler = DistributedSampler(
dataset,
num_replicas=num_replicas,
rank=rank,
shuffle=True
)
dataloader = DataLoader(
dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=4,
pin_memory=True
)
return dataloader
3. 数据增强技术
随机掩码
class MaskedGPTDataset(GPTDatasetV1):
"""支持随机掩码的数据集"""
def __init__(self, txt, tokenizer, max_length, stride, mask_prob=0.15):
super().__init__(txt, tokenizer, max_length, stride)
self.mask_prob = mask_prob
self.mask_token = tokenizer.encode("<|mask|>")[0] # 假设有掩码token
def __getitem__(self, idx):
input_ids, target_ids = super().__getitem__(idx)
# 随机掩码
mask = torch.rand(input_ids.shape) < self.mask_prob
input_ids[mask] = self.mask_token
return input_ids, target_ids
噪声注入
class NoisyGPTDataset(GPTDatasetV1):
"""支持噪声注入的数据集"""
def __init__(self, txt, tokenizer, max_length, stride, noise_prob=0.1):
super().__init__(txt, tokenizer, max_length, stride)
self.noise_prob = noise_prob
self.vocab_size = tokenizer.n_vocab
def __getitem__(self, idx):
input_ids, target_ids = super().__getitem__(idx)
# 随机替换token
noise_mask = torch.rand(input_ids.shape) < self.noise_prob
random_tokens = torch.randint(0, self.vocab_size, input_ids.shape)
input_ids[noise_mask] = random_tokens[noise_mask]
return input_ids, target_ids
4. 性能监控工具
数据加载性能分析
import time
from collections import defaultdict
class DataLoaderProfiler:
"""数据加载器性能分析工具"""
def __init__(self, dataloader):
self.dataloader = dataloader
self.stats = defaultdict(list)
def profile(self, num_batches=100):
"""分析数据加载性能"""
start_time = time.time()
for i, (inputs, targets) in enumerate(self.dataloader):
if i >= num_batches:
break
batch_start = time.time()
# 模拟数据处理
_ = inputs.shape, targets.shape
batch_time = time.time() - batch_start
self.stats['batch_time'].append(batch_time)
self.stats['batch_size'].append(inputs.shape[0])
total_time = time.time() - start_time
return {
'total_time': total_time,
'avg_batch_time': sum(self.stats['batch_time']) / len(self.stats['batch_time']),
'throughput_samples_per_sec': sum(self.stats['batch_size']) / total_time,
'num_batches': len(self.stats['batch_time'])
}
内存使用监控
import psutil
import torch
def monitor_memory_usage(dataloader, num_batches=10):
"""监控内存使用情况"""
process = psutil.Process()
memory_stats = []
for i, (inputs, targets) in enumerate(dataloader):
if i >= num_batches:
break
# 记录内存使用
memory_info = process.memory_info()
gpu_memory = torch.cuda.memory_allocated() if torch.cuda.is_available() else 0
memory_stats.append({
'batch': i,
'cpu_memory_mb': memory_info.rss / (1024**2),
'gpu_memory_mb': gpu_memory / (1024**2),
'batch_size': inputs.shape[0],
'sequence_length': inputs.shape[1]
})
return memory_stats
5. 配置管理
数据处理配置类
from dataclasses import dataclass
from typing import Optional
@dataclass
class DataConfig:
"""数据处理配置"""
# 文件路径
train_file: str
val_file: Optional[str] = None
# 分词器配置
tokenizer_name: str = "gpt2"
# 序列配置
max_length: int = 512
stride: int = 256
# 批处理配置
batch_size: int = 32
shuffle: bool = True
drop_last: bool = True
num_workers: int = 4
# 嵌入配置
vocab_size: int = 50257
embedding_dim: int = 768
# 性能配置
pin_memory: bool = True
persistent_workers: bool = True
def create_dataloader_from_config(config: DataConfig):
"""根据配置创建数据加载器"""
with open(config.train_file, 'r') as f:
text = f.read()
return create_dataloader_v1(
text,
batch_size=config.batch_size,
max_length=config.max_length,
stride=config.stride,
shuffle=config.shuffle,
drop_last=config.drop_last,
num_workers=config.num_workers
)
实际应用场景
1. 预训练数据处理
- 大规模文本:处理TB级别的预训练语料
- 多文档拼接:将多个文档拼接成长序列
- 质量过滤:过滤低质量和重复内容
2. 微调数据处理
- 任务特定格式:适配不同下游任务的数据格式
- 少样本学习:处理小规模高质量数据
- 多轮对话:处理对话历史和上下文
3. 推理数据处理
- 实时处理:支持在线推理的快速数据处理
- 批量推理:优化批量推理的数据组织
- 流式生成:支持流式文本生成的数据管理
实践建议
1. 参数调优指南
- max_length:根据GPU内存和任务需求选择
- stride:平衡数据增强和计算效率
- batch_size:考虑内存限制和训练稳定性
- num_workers:根据CPU核心数和I/O性能调整
2. 性能优化策略
- 数据预处理:预先处理和缓存分词结果
- 内存管理:使用pin_memory和persistent_workers
- 并行加载:合理设置num_workers数量
- GPU利用率:优化批次大小和序列长度
3. 调试技巧
- 小规模测试:先用小数据集验证流水线
- 形状检查:验证每个阶段的张量形状
- 性能监控:监控数据加载速度和内存使用
- 可视化验证:检查分词和样本构造的正确性
更新日志
2025/8/18 00:31
查看所有更新日志
bd1d0
-迁移目录于b0f2a
-docs: 完善大模型学习文档 - 增加设计思路与执行流程于dfb81
-update于dc6b2
-update于
版权所有
版权归属:NateHHX