Skip to content

图解分布式训练(七)—— Accelerate 分布式训练详解

来源:AiGC面试宝典 作者:宁静致远 日期:2023年09月29日


一、为什么需要 Accelerate 分布式训练?

PyTorch Accelerate 是由 Hugging Face、NVIDIA、AWS 和 Microsoft 等公司联合开发的 PyTorch 加速工具包,旨在简化分布式训练和推理的开发过程,并提高性能。

📝通俗解释:Accelerate 就像是一个"训练加速器",它把分布式训练中复杂的技术细节都封装起来了,让开发者只需要几行代码就能实现多GPU训练,不需要自己手写复杂的通信逻辑。


二、什么是 Accelerate 分布式训练?

2.1 Accelerate 分布式训练介绍

PyTorch Accelerate 提供了一组简单易用的 API,帮助开发者实现以下功能:

  • 分布式训练:在多个 GPU 或多台机器上并行训练模型
  • 混合精度训练:使用半精度浮点数(FP16)加速训练,减少显存占用
  • 自动调参:集成 PyTorch Lightning,支持自动超参数调整
  • 数据加载优化:优化 DataLoader,提升数据加载效率
  • 模型优化:支持 Apex 和 TorchScript 等优化工具

📝通俗解释:Accelerate 就像一个"全能教练",不仅帮你把训练任务分配到多个GPU上(分布式训练),还能用更省内存的方式训练(混合精度),同时帮你自动调整学习率等参数(自动调参)。

2.2 Accelerate 主要优势

特性说明
分布式训练多GPU/多机器并行,缩短训练时间
混合精度训练FP16加速,减少显存使用
自动调参自动调整超参数,提高模型性能
数据加载优化优化数据加载,减少等待时间
模型优化支持 Apex、TorchScript 等工具

📝通俗解释:想象一下你要完成一项大工程(训练大模型),Accelerate 帮你找来了多个工人(GPU),每人负责一部分工作(分布式),而且还教他们用更高效的方法(混合精度),这就是它的核心价值。


三、Accelerate 分布式训练原理

3.1 分布式训练概念

分布式训练是指将训练数据拆分到多个计算设备上并行处理,最后汇总结果得到完整模型。这种方式可以:

  1. 大幅缩短训练时间:多个GPU同时计算
  2. 支持更大batch size:数据分散到多个GPU,单个GPU显存压力减小
  3. 处理超大规模数据:突破单卡显存限制

📝通俗解释:就像一个班级要抄写1000道题目答案,如果只有一个人抄,可能要很久。但如果把题目分成10份,10个人同时抄,就能快10倍。分布式训练就是这个道理。

3.2 加速策略

Accelerate 支持多种分布式训练策略:

3.2.1 Pipeline 并行(流水线并行)

将模型按层拆分,不同计算设备负责不同层的计算。

  • 优点:每个设备只需加载部分模型,突破单卡显存限制
  • 缺点:设备间存在依赖关系,可能产生等待空闲时间
设备1: 层1 → 层2 → 层3 → |→ 设备2: 层4 → 层5 → 层6 → |→ 设备3: 层7 → 输出

📝通俗解释:想象一个生产流水线,第一个工人做完零件交给第二个,第二个做完交给第三个。Pipeline并行就是这个原理,但缺点是如果第一个工人慢了,后面的人就要等着。

3.2.2 Data Parallel(数据并行)

每个计算设备都持有完整的模型副本,但处理不同的数据。

  • 优点:实现简单,所有设备都能完整参与计算
  • 缺点:每个设备都需要完整模型副本,显存压力大
设备1: 完整模型 + 数据A → 输出1
设备2: 完整模型 + 数据B → 输出2
设备3: 完整模型 + 数据C → 输出3
... 然后汇总所有输出

📝通俗解释:就像10个人每人手里都有一本完整的教材,但每个人只做不同的题目。最后把大家的答案汇总起来,就是完整的结果。这是目前最常用的分布式训练方式。

3.2.3 加速器(Accelerator)

指用于加速深度学习的硬件设备,如 GPU、TPU 等。Accelerate 能自动检测并利用可用的加速设备。

📝通俗解释:加速器就是"计算引擎",比如NVIDIA的GPU。Accelerate会自动检测电脑里有多少个GPU,然后合理分配任务。


四、Accelerate 分布式训练实践

4.1 环境安装

bash
pip install accelerate==0.17.1

📝通俗解释:安装这个库就像给电脑装了一个"训练插件",安装完后就能用简单的几行代码实现多GPU训练。

4.2 核心代码实现

4.2.1 导入必要的库

python
import json
import time
import random
import torch
import torch.nn as nn
import numpy as np
import torch.distributed as dist
from sklearn.metrics import classification_report
from accelerate import Accelerator
from torch.utils.data import DataLoader
from transformers import BertForSequenceClassification, BertTokenizer, BertConfig, AdamW

4.2.2 设置随机种子(保证实验可重现)

python
def set_seed(seed=123):
    """设置随机数种子,保证实验可重现"""
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)
    torch.cuda.manual_seed_all(seed)

📝通俗解释:设置随机种子就像"固定洗牌顺序",保证每次训练的结果是一样的,这样方便我们比较不同参数的效果。

4.2.3 数据处理

python
def get_data():
    with open("data/train.json", "r", encoding="utf-8") as fp:
        data = fp.read()
    data = json.loads(data)
    return data

def load_data():
    data = get_data()
    return_data = []
    for d in data:
        text = d[0]
        label = d[1]
        return_data.append(("".join(text.split(" ")).strip(), label))
    return return_data

4.2.4 自定义 DataLoader 批处理函数

python
class Collate:
    def __init__(self, tokenizer, max_seq_len):
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len

    def collate_fn(self, batch):
        input_ids_all = []
        token_type_ids_all = []
        attention_mask_all = []
        label_all = []
        
        for data in batch:
            text = data[0]
            label = data[1]
            inputs = self.tokenizer.encode_plus(
                text=text,
                max_length=self.max_seq_len,
                padding="max_length",
                truncation="longest_first",
                return_attention_mask=True,
                return_token_type_ids=True
            )
            input_ids_all.append(inputs["input_ids"])
            token_type_ids_all.append(inputs["token_type_ids"])
            attention_mask_all.append(inputs["attention_mask"])
            label_all.append(label)

        return {
            "input_ids": torch.tensor(input_ids_all, dtype=torch.long),
            "attention_mask": torch.tensor(attention_mask_all, dtype=torch.long),
            "token_type_ids": torch.tensor(token_type_ids_all, dtype=torch.long),
            "label": torch.tensor(label_all, dtype=torch.long)
        }

📝通俗解释:这段代码做的事情就是把原始文本转换成模型能看懂的数字格式(tokenize),同时把不同长度的句子padding成统一长度。

4.2.5 训练器类定义

python
class Trainer:
    def __init__(self, args, config, model_engine, criterion, optimizer, accelerator):
        self.args = args
        self.config = config
        self.model_engine = model_engine
        self.criterion = criterion
        self.optimizer = optimizer
        self.accelerator = accelerator

    def on_step(self, batch_data):
        """单步前向传播"""
        label = batch_data["label"].cuda()
        input_ids = batch_data["input_ids"].cuda()
        token_type_ids = batch_data["token_type_ids"].cuda()
        attention_mask = batch_data["attention_mask"].cuda()
        
        output = self.model_engine(
            input_ids=input_ids,
            token_type_ids=token_type_ids,
            attention_mask=attention_mask,
            labels=label
        )
        logits = output[1]
        return logits, label

    def loss_reduce(self, loss):
        """多GPUloss汇总"""
        rt = loss.clone()
        dist.all_reduce(rt, op=dist.ReduceOp.SUM)
        rt /= torch.cuda.device_count()
        return rt

    def output_reduce(self, outputs, targets):
        """多GPU输出和标签汇总"""
        # 聚合所有GPU的输出
        output_gather_list = [torch.zeros_like(outputs) for _ in range(torch.cuda.device_count())]
        dist.all_gather(output_gather_list, outputs)
        outputs = torch.cat(output_gather_list, dim=0)
        
        # 聚合所有GPU的标签
        target_gather_list = [torch.zeros_like(targets) for _ in range(torch.cuda.device_count())]
        dist.all_gather(target_gather_list, targets)
        targets = torch.cat(target_gather_list, dim=0)
        
        return outputs, targets

📝通俗解释:分布式训练时,每个GPU独立计算自己的数据,最后需要把所有GPU的结果汇总起来(reduce和gather操作),这样才能计算整体的loss和评估指标。

4.2.6 训练、验证和测试方法

python
def train(self, train_loader, dev_loader=None):
    global_step = 1
    best_acc = 0.0
    
    if self.args.local_rank == 0:
        start = time.time()
    
    for epoch in range(1, self.args.epochs + 1):
        for step, batch_data in enumerate(train_loader):
            self.model_engine.train()
            logits, label = self.on_step(batch_data)
            loss = self.criterion(logits, label)
            
            # Accelerate 自动处理梯度反向传播
            self.accelerator.backward(loss)
            self.optimizer.step()
            self.optimizer.zero_grad()
            
            # 汇总所有GPU的loss
            loss = self.loss_reduce(loss)
            
            if self.args.local_rank == 0:
                print("【train】 epoch: {}/{} step: {}/{} loss: {:.6f}".format(
                    epoch, self.args.epochs, global_step, self.args.total_step, loss
                ))
            
            global_step += 1
            
            # 验证阶段
            if self.args.dev and global_step % self.args.eval_step == 0:
                loss, accuracy = self.dev(dev_loader)
                if self.args.local_rank == 0:
                    print("【dev】 loss: {:.6f} accuracy: {:.4f}".format(loss, accuracy))
                    if accuracy > best_acc:
                        best_acc = accuracy
                        print("【best accuracy】 {:.4f}".format(best_acc))
                        torch.save(self.model_engine.state_dict(), self.args.ckpt_path)

    if self.args.local_rank == 0:
        end = time.time()
        print("耗时:{}分钟".format((end - start) / 60))
    
    if not self.args.dev and self.args.local_rank == 0:
        torch.save(self.model_engine.state_dict(), self.args.ckpt_path)

def dev(self, dev_loader):
    """验证方法"""
    self.model_engine.eval()
    correct_total = 0
    num_total = 0
    loss_total = 0.0
    
    with torch.no_grad():
        for step, batch_data in enumerate(dev_loader):
            logits, label = self.on_step(batch_data)
            loss = self.criterion(logits, label)
            loss = self.loss_reduce(loss)
            logits, label = self.output_reduce(logits, label)
            
            loss_total += loss
            logits = logits.detach().cpu().numpy()
            label = label.view(-1).detach().cpu().numpy()
            num_total += len(label)
            
            preds = np.argmax(logits, axis=1).flatten()
            correct_num = (preds == label).sum()
            correct_total += correct_num

    return loss_total, correct_total / num_total

def test(self, model_engine, test_loader, labels):
    """测试方法"""
    self.model_engine = model_engine
    self.model_engine.eval()
    preds = []
    trues = []
    
    with torch.no_grad():
        for step, batch_data in enumerate(test_loader):
            logits, label = self.on_step(batch_data)
            logits, label = self.output_reduce(logits, label)
            
            label = label.view(-1).detach().cpu().numpy().tolist()
            logits = logits.detach().cpu().numpy()
            pred = np.argmax(logits, axis=1).flatten().tolist()
            
            trues.extend(label)
            preds.extend(pred)
    
    report = classification_report(trues, preds, target_names=labels)
    return report

📝通俗解释:训练循环中 accelerator.backward(loss) 是关键,它会自动处理分布式环境下的梯度同步,不需要我们手动编写复杂的通信代码。

4.2.7 优化器构建

python
def build_optimizer(model, args):
    no_decay = ['bias', 'LayerNorm.weight']
    optimizer_grouped_parameters = [
        {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
         'weight_decay': args.weight_decay},
        {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
         'weight_decay': 0.0}
    ]
    optimizer = AdamW(optimizer_grouped_parameters, lr=args.learning_rate)
    return optimizer

📝通俗解释:这里对模型的偏置项和归一化层不施加权重衰减(L2正则化),因为这些参数通常不需要正则化。

4.2.8 参数配置类

python
class Args:
    model_path = "model_hub/chinese-bert-wwm-ext"
    ckpt_path = "output/accelerate/multi-gpu-accelerate-cls.pt"
    max_seq_len = 128
    ratio = 0.92
    epochs = 1
    eval_step = 50
    dev = False
    local_rank = None
    train_batch_size = 32
    dev_batch_size = 32
    weight_decay = 0.01
    learning_rate = 3e-5

4.2.9 主函数

python
def main():
    # ================================
    # 1. 初始化设置
    set_seed()
    label2id = {
        "其他": 0,
        "喜好": 1,
        "悲伤": 2,
        "厌恶": 3,
        "愤怒": 4,
        "高兴": 5,
    }
    args = Args()
    tokenizer = BertTokenizer.from_pretrained(args.model_path)

    # ================================
    # 2. 加载数据集
    data = load_data()
    data = data[:10000]  # 取1万条数据
    random.shuffle(data)
    
    train_num = int(len(data) * args.ratio)
    train_data = data[:train_num]
    dev_data = data[train_num:]

    collate = Collate(tokenizer, args.max_seq_len)
    train_loader = DataLoader(
        train_data,
        batch_size=args.train_batch_size,
        shuffle=True,
        num_workers=2,
        collate_fn=collate.collate_fn
    )
    
    total_step = len(train_loader) * args.epochs // torch.cuda.device_count()
    args.total_step = total_step
    
    dev_loader = DataLoader(
        dev_data,
        batch_size=args.dev_batch_size,
        shuffle=False,
        num_workers=2,
        collate_fn=collate.collate_fn
    )
    test_loader = dev_loader

    # ================================
    # 3. 定义模型、优化器、损失函数
    config = BertConfig.from_pretrained(args.model_path, num_labels=6)
    model = BertForSequenceClassification.from_pretrained(args.model_path, config=config)
    model.cuda()
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = build_optimizer(model, args)

    # ================================
    # 4. 初始化 Accelerator(核心步骤)
    accelerator = Accelerator()
    args.local_rank = int(dist.get_rank())
    print(args.local_rank)
    
    # prepare 方法会自动处理分布式训练的各项设置
    model_engine, optimizer_engine, train_loader_engine, dev_loader_engine = accelerator.prepare(
        model, optimizer, train_loader, dev_loader
    )

    # ================================
    # 5. 创建训练器并开始训练
    trainer = Trainer(
        args, config, model_engine, criterion, optimizer_engine, accelerator
    )
    trainer.train(train_loader_engine, dev_loader_engine)

    # ================================
    # 6. 测试阶段
    labels = list(label2id.keys())
    config = BertConfig.from_pretrained(args.model_path, num_labels=6)
    model = BertForSequenceClassification.from_pretrained(args.model_path, config=config)
    model.cuda()

    # 需要重新初始化引擎以加载模型
    model_engine, optimizer_engine, train_loader_engine, dev_loader_engine = accelerator.prepare(
        model, optimizer, train_loader, dev_loader
    )
    model_engine.load_state_dict(torch.load(args.ckpt_path))
    report = trainer.test(model_engine, test_loader, labels)
    
    if args.local_rank == 0:
        print(report)

if __name__ == '__main__':
    main()

📝通俗解释:整个流程的核心是 accelerator.prepare() 这行代码,它相当于"一键启动分布式模式",自动把模型、优化器、数据加载器都转换成分布式版本。


4.3 运行方式

方式一:使用 accelerate 命令(推荐)

bash
accelerate launch multi-gpu-accelerate-cls.py

方式二:使用 PyTorch 原生分布式启动

bash
python -m torch.distributed.launch --nproc_per_node 2 --use_env multi-gpu-accelerate-cls.py

运行效果

【train】 epoch: 1/1 step: 1/144 loss: 1.795169
【train】 epoch: 1/1 step: 2/144 loss: 1.744665
【train】 epoch: 1/1 step: 3/144 loss: 1.631625
【train】 epoch: 1/1 step: 4/144 loss: 1.543691
【train】 epoch: 1/1 step: 5/144 loss: 1.788955

GPU 使用情况

+-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla T4            On   | 00000000:00:06.0 Off |                  Off |
| N/A  38C    P0    70W /  70W |    6021MiB / 16384MiB |      0%      Default |
|                               |                      |               N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            On   | 00000000:00:07.0 Off |                  Off |
| N/A  38C    P0    67W /  70W |    5477MiB / 16384MiB |      0%      Default |
|                               |                      |               N/A |
+-------------------------------+----------------------+----------------------+

📝通俗解释:从输出可以看到,两个GPU(Tesla T4)都在工作,显存使用约6GB,GPU利用率接近100%,说明分布式训练正在高效运行。


五、总结

Accelerate 分布式训练的核心优势:

  1. 简单易用:只需几行代码即可实现多GPU分布式训练
  2. 自动处理复杂逻辑:梯度同步、模型分发等细节自动完成
  3. 兼容性好:支持多种分布式训练策略和硬件设备
  4. 灵活性高:可以与 PyTorch Lightning、DeepSpeed 等框架配合使用

📝通俗解释:Accelerate 的设计理念就是"让分布式训练变得像单机训练一样简单",开发者不需要了解底层复杂的分布式通信机制,只需要专注于模型和数据的开发。

基于 MIT 许可发布