图解分布式训练(七)—— Accelerate 分布式训练详解
来源:AiGC面试宝典 作者:宁静致远 日期:2023年09月29日
一、为什么需要 Accelerate 分布式训练?
PyTorch Accelerate 是由 Hugging Face、NVIDIA、AWS 和 Microsoft 等公司联合开发的 PyTorch 加速工具包,旨在简化分布式训练和推理的开发过程,并提高性能。
📝通俗解释:Accelerate 就像是一个"训练加速器",它把分布式训练中复杂的技术细节都封装起来了,让开发者只需要几行代码就能实现多GPU训练,不需要自己手写复杂的通信逻辑。
二、什么是 Accelerate 分布式训练?
2.1 Accelerate 分布式训练介绍
PyTorch Accelerate 提供了一组简单易用的 API,帮助开发者实现以下功能:
- 分布式训练:在多个 GPU 或多台机器上并行训练模型
- 混合精度训练:使用半精度浮点数(FP16)加速训练,减少显存占用
- 自动调参:集成 PyTorch Lightning,支持自动超参数调整
- 数据加载优化:优化 DataLoader,提升数据加载效率
- 模型优化:支持 Apex 和 TorchScript 等优化工具
📝通俗解释:Accelerate 就像一个"全能教练",不仅帮你把训练任务分配到多个GPU上(分布式训练),还能用更省内存的方式训练(混合精度),同时帮你自动调整学习率等参数(自动调参)。
2.2 Accelerate 主要优势
| 特性 | 说明 |
|---|---|
| 分布式训练 | 多GPU/多机器并行,缩短训练时间 |
| 混合精度训练 | FP16加速,减少显存使用 |
| 自动调参 | 自动调整超参数,提高模型性能 |
| 数据加载优化 | 优化数据加载,减少等待时间 |
| 模型优化 | 支持 Apex、TorchScript 等工具 |
📝通俗解释:想象一下你要完成一项大工程(训练大模型),Accelerate 帮你找来了多个工人(GPU),每人负责一部分工作(分布式),而且还教他们用更高效的方法(混合精度),这就是它的核心价值。
三、Accelerate 分布式训练原理
3.1 分布式训练概念
分布式训练是指将训练数据拆分到多个计算设备上并行处理,最后汇总结果得到完整模型。这种方式可以:
- 大幅缩短训练时间:多个GPU同时计算
- 支持更大batch size:数据分散到多个GPU,单个GPU显存压力减小
- 处理超大规模数据:突破单卡显存限制
📝通俗解释:就像一个班级要抄写1000道题目答案,如果只有一个人抄,可能要很久。但如果把题目分成10份,10个人同时抄,就能快10倍。分布式训练就是这个道理。
3.2 加速策略
Accelerate 支持多种分布式训练策略:
3.2.1 Pipeline 并行(流水线并行)
将模型按层拆分,不同计算设备负责不同层的计算。
- 优点:每个设备只需加载部分模型,突破单卡显存限制
- 缺点:设备间存在依赖关系,可能产生等待空闲时间
设备1: 层1 → 层2 → 层3 → |→ 设备2: 层4 → 层5 → 层6 → |→ 设备3: 层7 → 输出📝通俗解释:想象一个生产流水线,第一个工人做完零件交给第二个,第二个做完交给第三个。Pipeline并行就是这个原理,但缺点是如果第一个工人慢了,后面的人就要等着。
3.2.2 Data Parallel(数据并行)
每个计算设备都持有完整的模型副本,但处理不同的数据。
- 优点:实现简单,所有设备都能完整参与计算
- 缺点:每个设备都需要完整模型副本,显存压力大
设备1: 完整模型 + 数据A → 输出1
设备2: 完整模型 + 数据B → 输出2
设备3: 完整模型 + 数据C → 输出3
... 然后汇总所有输出📝通俗解释:就像10个人每人手里都有一本完整的教材,但每个人只做不同的题目。最后把大家的答案汇总起来,就是完整的结果。这是目前最常用的分布式训练方式。
3.2.3 加速器(Accelerator)
指用于加速深度学习的硬件设备,如 GPU、TPU 等。Accelerate 能自动检测并利用可用的加速设备。
📝通俗解释:加速器就是"计算引擎",比如NVIDIA的GPU。Accelerate会自动检测电脑里有多少个GPU,然后合理分配任务。
四、Accelerate 分布式训练实践
4.1 环境安装
pip install accelerate==0.17.1📝通俗解释:安装这个库就像给电脑装了一个"训练插件",安装完后就能用简单的几行代码实现多GPU训练。
4.2 核心代码实现
4.2.1 导入必要的库
import json
import time
import random
import torch
import torch.nn as nn
import numpy as np
import torch.distributed as dist
from sklearn.metrics import classification_report
from accelerate import Accelerator
from torch.utils.data import DataLoader
from transformers import BertForSequenceClassification, BertTokenizer, BertConfig, AdamW4.2.2 设置随机种子(保证实验可重现)
def set_seed(seed=123):
"""设置随机数种子,保证实验可重现"""
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed_all(seed)📝通俗解释:设置随机种子就像"固定洗牌顺序",保证每次训练的结果是一样的,这样方便我们比较不同参数的效果。
4.2.3 数据处理
def get_data():
with open("data/train.json", "r", encoding="utf-8") as fp:
data = fp.read()
data = json.loads(data)
return data
def load_data():
data = get_data()
return_data = []
for d in data:
text = d[0]
label = d[1]
return_data.append(("".join(text.split(" ")).strip(), label))
return return_data4.2.4 自定义 DataLoader 批处理函数
class Collate:
def __init__(self, tokenizer, max_seq_len):
self.tokenizer = tokenizer
self.max_seq_len = max_seq_len
def collate_fn(self, batch):
input_ids_all = []
token_type_ids_all = []
attention_mask_all = []
label_all = []
for data in batch:
text = data[0]
label = data[1]
inputs = self.tokenizer.encode_plus(
text=text,
max_length=self.max_seq_len,
padding="max_length",
truncation="longest_first",
return_attention_mask=True,
return_token_type_ids=True
)
input_ids_all.append(inputs["input_ids"])
token_type_ids_all.append(inputs["token_type_ids"])
attention_mask_all.append(inputs["attention_mask"])
label_all.append(label)
return {
"input_ids": torch.tensor(input_ids_all, dtype=torch.long),
"attention_mask": torch.tensor(attention_mask_all, dtype=torch.long),
"token_type_ids": torch.tensor(token_type_ids_all, dtype=torch.long),
"label": torch.tensor(label_all, dtype=torch.long)
}📝通俗解释:这段代码做的事情就是把原始文本转换成模型能看懂的数字格式(tokenize),同时把不同长度的句子padding成统一长度。
4.2.5 训练器类定义
class Trainer:
def __init__(self, args, config, model_engine, criterion, optimizer, accelerator):
self.args = args
self.config = config
self.model_engine = model_engine
self.criterion = criterion
self.optimizer = optimizer
self.accelerator = accelerator
def on_step(self, batch_data):
"""单步前向传播"""
label = batch_data["label"].cuda()
input_ids = batch_data["input_ids"].cuda()
token_type_ids = batch_data["token_type_ids"].cuda()
attention_mask = batch_data["attention_mask"].cuda()
output = self.model_engine(
input_ids=input_ids,
token_type_ids=token_type_ids,
attention_mask=attention_mask,
labels=label
)
logits = output[1]
return logits, label
def loss_reduce(self, loss):
"""多GPUloss汇总"""
rt = loss.clone()
dist.all_reduce(rt, op=dist.ReduceOp.SUM)
rt /= torch.cuda.device_count()
return rt
def output_reduce(self, outputs, targets):
"""多GPU输出和标签汇总"""
# 聚合所有GPU的输出
output_gather_list = [torch.zeros_like(outputs) for _ in range(torch.cuda.device_count())]
dist.all_gather(output_gather_list, outputs)
outputs = torch.cat(output_gather_list, dim=0)
# 聚合所有GPU的标签
target_gather_list = [torch.zeros_like(targets) for _ in range(torch.cuda.device_count())]
dist.all_gather(target_gather_list, targets)
targets = torch.cat(target_gather_list, dim=0)
return outputs, targets📝通俗解释:分布式训练时,每个GPU独立计算自己的数据,最后需要把所有GPU的结果汇总起来(reduce和gather操作),这样才能计算整体的loss和评估指标。
4.2.6 训练、验证和测试方法
def train(self, train_loader, dev_loader=None):
global_step = 1
best_acc = 0.0
if self.args.local_rank == 0:
start = time.time()
for epoch in range(1, self.args.epochs + 1):
for step, batch_data in enumerate(train_loader):
self.model_engine.train()
logits, label = self.on_step(batch_data)
loss = self.criterion(logits, label)
# Accelerate 自动处理梯度反向传播
self.accelerator.backward(loss)
self.optimizer.step()
self.optimizer.zero_grad()
# 汇总所有GPU的loss
loss = self.loss_reduce(loss)
if self.args.local_rank == 0:
print("【train】 epoch: {}/{} step: {}/{} loss: {:.6f}".format(
epoch, self.args.epochs, global_step, self.args.total_step, loss
))
global_step += 1
# 验证阶段
if self.args.dev and global_step % self.args.eval_step == 0:
loss, accuracy = self.dev(dev_loader)
if self.args.local_rank == 0:
print("【dev】 loss: {:.6f} accuracy: {:.4f}".format(loss, accuracy))
if accuracy > best_acc:
best_acc = accuracy
print("【best accuracy】 {:.4f}".format(best_acc))
torch.save(self.model_engine.state_dict(), self.args.ckpt_path)
if self.args.local_rank == 0:
end = time.time()
print("耗时:{}分钟".format((end - start) / 60))
if not self.args.dev and self.args.local_rank == 0:
torch.save(self.model_engine.state_dict(), self.args.ckpt_path)
def dev(self, dev_loader):
"""验证方法"""
self.model_engine.eval()
correct_total = 0
num_total = 0
loss_total = 0.0
with torch.no_grad():
for step, batch_data in enumerate(dev_loader):
logits, label = self.on_step(batch_data)
loss = self.criterion(logits, label)
loss = self.loss_reduce(loss)
logits, label = self.output_reduce(logits, label)
loss_total += loss
logits = logits.detach().cpu().numpy()
label = label.view(-1).detach().cpu().numpy()
num_total += len(label)
preds = np.argmax(logits, axis=1).flatten()
correct_num = (preds == label).sum()
correct_total += correct_num
return loss_total, correct_total / num_total
def test(self, model_engine, test_loader, labels):
"""测试方法"""
self.model_engine = model_engine
self.model_engine.eval()
preds = []
trues = []
with torch.no_grad():
for step, batch_data in enumerate(test_loader):
logits, label = self.on_step(batch_data)
logits, label = self.output_reduce(logits, label)
label = label.view(-1).detach().cpu().numpy().tolist()
logits = logits.detach().cpu().numpy()
pred = np.argmax(logits, axis=1).flatten().tolist()
trues.extend(label)
preds.extend(pred)
report = classification_report(trues, preds, target_names=labels)
return report📝通俗解释:训练循环中
accelerator.backward(loss)是关键,它会自动处理分布式环境下的梯度同步,不需要我们手动编写复杂的通信代码。
4.2.7 优化器构建
def build_optimizer(model, args):
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
{'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
'weight_decay': args.weight_decay},
{'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
'weight_decay': 0.0}
]
optimizer = AdamW(optimizer_grouped_parameters, lr=args.learning_rate)
return optimizer📝通俗解释:这里对模型的偏置项和归一化层不施加权重衰减(L2正则化),因为这些参数通常不需要正则化。
4.2.8 参数配置类
class Args:
model_path = "model_hub/chinese-bert-wwm-ext"
ckpt_path = "output/accelerate/multi-gpu-accelerate-cls.pt"
max_seq_len = 128
ratio = 0.92
epochs = 1
eval_step = 50
dev = False
local_rank = None
train_batch_size = 32
dev_batch_size = 32
weight_decay = 0.01
learning_rate = 3e-54.2.9 主函数
def main():
# ================================
# 1. 初始化设置
set_seed()
label2id = {
"其他": 0,
"喜好": 1,
"悲伤": 2,
"厌恶": 3,
"愤怒": 4,
"高兴": 5,
}
args = Args()
tokenizer = BertTokenizer.from_pretrained(args.model_path)
# ================================
# 2. 加载数据集
data = load_data()
data = data[:10000] # 取1万条数据
random.shuffle(data)
train_num = int(len(data) * args.ratio)
train_data = data[:train_num]
dev_data = data[train_num:]
collate = Collate(tokenizer, args.max_seq_len)
train_loader = DataLoader(
train_data,
batch_size=args.train_batch_size,
shuffle=True,
num_workers=2,
collate_fn=collate.collate_fn
)
total_step = len(train_loader) * args.epochs // torch.cuda.device_count()
args.total_step = total_step
dev_loader = DataLoader(
dev_data,
batch_size=args.dev_batch_size,
shuffle=False,
num_workers=2,
collate_fn=collate.collate_fn
)
test_loader = dev_loader
# ================================
# 3. 定义模型、优化器、损失函数
config = BertConfig.from_pretrained(args.model_path, num_labels=6)
model = BertForSequenceClassification.from_pretrained(args.model_path, config=config)
model.cuda()
criterion = torch.nn.CrossEntropyLoss()
optimizer = build_optimizer(model, args)
# ================================
# 4. 初始化 Accelerator(核心步骤)
accelerator = Accelerator()
args.local_rank = int(dist.get_rank())
print(args.local_rank)
# prepare 方法会自动处理分布式训练的各项设置
model_engine, optimizer_engine, train_loader_engine, dev_loader_engine = accelerator.prepare(
model, optimizer, train_loader, dev_loader
)
# ================================
# 5. 创建训练器并开始训练
trainer = Trainer(
args, config, model_engine, criterion, optimizer_engine, accelerator
)
trainer.train(train_loader_engine, dev_loader_engine)
# ================================
# 6. 测试阶段
labels = list(label2id.keys())
config = BertConfig.from_pretrained(args.model_path, num_labels=6)
model = BertForSequenceClassification.from_pretrained(args.model_path, config=config)
model.cuda()
# 需要重新初始化引擎以加载模型
model_engine, optimizer_engine, train_loader_engine, dev_loader_engine = accelerator.prepare(
model, optimizer, train_loader, dev_loader
)
model_engine.load_state_dict(torch.load(args.ckpt_path))
report = trainer.test(model_engine, test_loader, labels)
if args.local_rank == 0:
print(report)
if __name__ == '__main__':
main()📝通俗解释:整个流程的核心是
accelerator.prepare()这行代码,它相当于"一键启动分布式模式",自动把模型、优化器、数据加载器都转换成分布式版本。
4.3 运行方式
方式一:使用 accelerate 命令(推荐)
accelerate launch multi-gpu-accelerate-cls.py方式二:使用 PyTorch 原生分布式启动
python -m torch.distributed.launch --nproc_per_node 2 --use_env multi-gpu-accelerate-cls.py运行效果
【train】 epoch: 1/1 step: 1/144 loss: 1.795169
【train】 epoch: 1/1 step: 2/144 loss: 1.744665
【train】 epoch: 1/1 step: 3/144 loss: 1.631625
【train】 epoch: 1/1 step: 4/144 loss: 1.543691
【train】 epoch: 1/1 step: 5/144 loss: 1.788955GPU 使用情况
+-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla T4 On | 00000000:00:06.0 Off | Off |
| N/A 38C P0 70W / 70W | 6021MiB / 16384MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 1 Tesla T4 On | 00000000:00:07.0 Off | Off |
| N/A 38C P0 67W / 70W | 5477MiB / 16384MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+📝通俗解释:从输出可以看到,两个GPU(Tesla T4)都在工作,显存使用约6GB,GPU利用率接近100%,说明分布式训练正在高效运行。
五、总结
Accelerate 分布式训练的核心优势:
- 简单易用:只需几行代码即可实现多GPU分布式训练
- 自动处理复杂逻辑:梯度同步、模型分发等细节自动完成
- 兼容性好:支持多种分布式训练策略和硬件设备
- 灵活性高:可以与 PyTorch Lightning、DeepSpeed 等框架配合使用
📝通俗解释:Accelerate 的设计理念就是"让分布式训练变得像单机训练一样简单",开发者不需要了解底层复杂的分布式通信机制,只需要专注于模型和数据的开发。