Skip to content

图解分布式训练(六)—— PyTorch 的 DeepSpeed 详细解析

📝通俗解释:DeepSpeed 是微软开发的一个分布式训练工具,它的核心特点是能让"大模型"在"小显存"的GPU上训练。想象一下,原来需要10张卡才能训练的模型,现在可能1-2张卡就够了,这就是DeepSpeed的魔力。


动机

📝通俗解释:为什么需要DeepSpeed?就像一个大力士只能搬动小块砖头,但通过工具可以把大石头拆分成小块搬运一样。DeepSpeed就是那个"拆分工具",把大模型的参数拆分到多个GPU上,让每张卡只负责一部分。

最常见的深度学习框架应该是 TensorFlow、PyTorch、Keras,但是这些框架在面向大规模模型的时候都不是很方便。

比如 PyTorch 的分布式并行计算框架(Distributed Data Parallel,简称 DDP),它也仅仅是能将数据并行,放到各个 GPU 的模型上进行训练。

也就是说,DDP 的应用场景在你的模型大小大于显卡显存大小时,它就很难继续使用了,除非你自己再将模型参数拆散分散到各个 GPU 上。

今天要给大家介绍的 DeepSpeed,它就能实现这个拆散功能,它通过将模型参数拆散分布到各个 GPU 上,以实现大型模型的计算,弥补了 DDP 的缺点,非常方便,这也就意味着我们能用更少的 GPU 训练更大的模型,而且不受限于显存。


一、为什么需要 DeepSpeed?

📝通俗解释:大模型训练就像是要做一个超大的蛋糕,一台烤箱(GPU)放不下怎么办?就需要多台烤箱同时工作。DeepSpeed就是协调多台烤箱工作的"厨房管理系统"。

大模型(LLM)在训练时往往需要大量内存来存储中间激活、权重等参数,百亿模型甚至无法在单个 GPU 上进行训练,使得模型训练在某些情况下非常低效和不可能。这就需要进行多卡,或者多节点分布式训练。

在大规模深度学习模型训练中有个主要范式:

  • 数据并行
  • 模型并行

目前训练超大规模语言模型技术路线:GPU + PyTorch + Megatron-LM + DeepSpeed

DeepSpeed 是由 Microsoft 提供的分布式训练工具,旨在支持更大规模的模型和提供更多的优化策略和工具。与其他框架相比,DeepSpeed 支持更大规模的模型和提供更多的优化策略和工具。其中,主要优势在于支持更大规模的模型、提供了更多的优化策略和工具(例如 ZeRO 和 Offload 等)

📝通俗解释:DeepSpeed的四大核心功能:

  1. 3D并行化 - 就像同时用多个厨师、多条流水线、多个烤箱来做一个巨大的蛋糕
  2. ZeRO-Offload - 把部分工作"外包"给CPU,减轻GPU的负担
  3. 稀疏注意力 - 像快速阅读文章时只关注重点段落,跳过无关内容
  4. 1比特Adam - 减少通信量,就像用更少的快递员来传递消息
  1. 用 3D 并行化实现万亿参数模型训练

    • DeepSpeed 实现了三种并行方法的灵活组合:ZeRO 支持的数据并行,流水线并行和张量切片模型并行
    • 3D 并行性适应了不同工作负载的需求,以支持具有万亿参数的超大型模型,同时实现了近乎完美的显存扩展性和吞吐量扩展效率
    • 此外,其提高的通信效率使用户可以在网络带宽有限的常规群集上以 2-7 倍的速度训练有数十亿参数的模型
  2. ZeRO-Offload 使 GPU 单卡能够训练 10 倍大的模型

    • 为了同时利用 CPU 和 GPU 内存来训练大型模型,我们扩展了 ZeRO-2
    • 用户在使用带有单张英伟达 V100 GPU 的机器时,可以在不耗尽显存的情况下运行多达 130 亿个参数的模型,模型规模扩展至现有方法的 10 倍,并保持有竞争力的吞吐量
    • 此功能使数十亿参数的模型训练更加大众化,并为许多深度学习从业人员打开了一扇探索更大更好的模型的窗户
  3. 通过 DeepSpeed Sparse Attention 用 6 倍速度执行 10 倍长的序列

    • DeepSpeed 提供了稀疏 attention kernel——一种工具性技术,可支持长序列的模型输入,包括文本输入,图像输入和语音输入
    • 与经典的稠密 Transformer 相比,它支持的输入序列长一个数量级,并在保持相当的精度下获得最高 6 倍的执行速度提升
    • 它还比最新的稀疏实现快 1.5-3 倍
    • 此外,我们的稀疏 kernel 灵活支持稀疏格式,使用户能够通过自定义稀疏结构进行创新
  4. 1 比特 Adam 减少 5 倍通信量

    • Adam 是一个在大规模深度学习模型训练场景下的有效的(也许是最广为应用的)优化器
    • 然而,它与通信效率优化算法往往不兼容。因此,在跨设备进行分布式扩展时,通信开销可能成为瓶颈
    • 我们推出了一种 1 比特 Adam 新算法,以及其高效实现。该算法最多可减少 5 倍通信量,同时实现了与 Adam 相似的收敛率
    • 在通信受限的场景下,我们观察到分布式训练速度提升了 3.5 倍,这使得该算法可以扩展到不同类型的 GPU 群集和网络环境

二、DeepSpeed 基本概念 介绍一下?

2.1 DeepSpeed 介绍

📝通俗解释:DeepSpeed的核心概念 - 想象你在指挥一个乐队:

  • 节点编号 = 乐器组编号(弦乐组、铜管组等)
  • 全局进程编号 = 乐队中每个乐手的唯一编号
  • 局部进程编号 = 某个乐器组内的乐手编号
  • 全局总进程数 = 乐队总人数
  • 主节点 = 指挥家
  • 在分布式计算环境中,需要理解几个非常基础的概念:节点编号、全局进程编号、局部进程编号、全局总进程数和主节点。其中,主节点负责协调所有其他节点和进程的工作,因此是整个系统的关键部分。

  • DeepSpeed 还提供了 mpi、gloo 和 nccl 等通信策略,可以根据具体情况进行选择和配置。在使用 DeepSpeed 进行分布式训练时,可以根据具体情况选择合适的通信库,例如在 CPU 集群上进行分布式训练,可以选择 mpi 和 gloo;如果是在 GPU 上进行分布式训练,可以选择 nccl。

📝通俗解释:通信策略的选择 - 就像选择快递方式:

  • mpi = 普通快递,适合跨城市(跨节点)运输
  • gloo = 同城快递,可以是CPU或GPU
  • nccl = 专人直送,专门给NVIDIA GPU用的最快的通信方式
  • ZeRO (Zero Redundancy Optimizer) 是一种用于大规模训练优化的技术,主要是用来减少内存占用。ZeRO 将模型参数分成了三个部分:Optimizer States(优化器状态)、Gradient(梯度)和 Model Parameter(模型参数)。在使用 ZeRO 进行分布式训练时,可以选择 ZeRO-Offload 和 ZeRO-Stage3 等不同的优化技术。

📝通俗解释:ZeRO的工作原理 - 就像一个团队搬家:

  • 不使用ZeRO:每个人都复制一份所有家具(浪费空间)
  • ZeRO-1:每人只负责一种家具的清单
  • ZeRO-2:每人只负责一种家具的清单+搬运
  • ZeRO-3:按需调用,需要时才去别人那里拿
  • 混合精度训练是指在训练过程中同时使用 FP16(半精度浮点数)和 FP32(单精度浮点数)两种精度的技术。使用 FP16 可以大大减少内存占用,从而可以训练更大规模的模型。在使用混合精度训练时,需要使用一些技术来解决可能出现的梯度消失和模型不稳定的问题,例如动态精度缩放和混合精度优化器等。

📝通俗解释:混合精度 - 就像用不同精度的计算器:

  • FP32 = 精确计算器(慢但准)
  • FP16 = 快速心算(快但可能有点误差)
  • 混合精度 = 大部分用快速心算,关键步骤用精确计算器
  • 结合使用 huggingface 和 deepspeed

2.2 DeepSpeed 基础的概念

📝通俗解释:分布式训练的基本概念 - 这是一个多层级的组织结构

在分布式计算环境中,有几个非常基础的概念需要理解:

  • 节点编号(node_rank):分配给系统中每个节点的唯一标识符,用于区分不同计算机之间的通信。
  • 全局进程编号(rank):分配给整个系统中的每个进程的唯一标识符,用于区分不同进程之间的通信。
  • 局部进程编号(local_rank):分配给单个节点内的每个进程的唯一标识符,用于区分同一节点内的不同进程之间的通信。
  • 全局总进程数(world_size):在整个系统中运行的所有进程的总数,用于确定可以并行完成多少工作以及需要完成任务所需的资源数量。

📝通俗解释:world_size vs word_size - 注意是world(世界),不是word(单词),这是很多初学者容易拼错的地方

  • 主节点(master_ip+master_port):在分布式计算环境中,主节点负责协调所有其他节点和进程的工作,为了确定主节点,我们需要知道它的 IP 地址和端口号。主节点还负责监控系统状态、处理任务分配和结果汇总等任务,因此是整个系统的关键部分。

2.3 DeepSpeed 支持的功能

  • DeepSpeed 目前支持的功能:
    • Optimizer state partitioning (ZeRO stage 1) - 优化器状态分片
    • Gradient partitioning (ZeRO stage 2) - 梯度分片
    • Parameter partitioning (ZeRO stage 3) - 参数分片
    • Custom mixed precision training handling - 自定义混合精度训练
    • A range of fast CUDA-extension-based optimizers - 基于CUDA扩展的快速优化器
    • ZeRO-Offload to CPU and NVMe - 将数据卸载到CPU或NVMe硬盘

三、DeepSpeed 通信策略 介绍一下?

📝通俗解释:通信策略 - 就像团队协作时的沟通方式

DeepSpeed 还提供了 mpi、gloo 和 nccl 等通信策略,可以根据具体情况进行选择和配置:

  • mpi 是一种跨节点通信库,常用于 CPU 集群上的分布式训练
  • gloo 是一种高性能的分布式训练框架,支持 CPU 和 GPU 上的分布式训练
  • nccl 是 NVIDIA 提供的 GPU 专用通信库,被广泛应用于 GPU 上的分布式训练

📝通俗解释:选择建议 - 一般情况下:

  • 多节点训练:优先选nccl(最快)
  • CPU集群:选mpi或gloo
  • 单机多卡:nccl是默认最佳选择

在使用 DeepSpeed 进行分布式训练时,可以根据具体情况选择合适的通信库。通常情况下,如果是在 CPU 集群上进行分布式训练,可以选择 mpi 和 gloo;如果是在 GPU 上进行分布式训练,可以选择 nccl。

bash
# 调试时可以设置这个环境变量来定位问题
export CUDA_LAUNCH_BLOCKING=1

📝通俗解释:CUDA_LAUNCH_BLOCKING - 就像开启"慢动作模式",把异步的GPU操作改成同步执行,方便定位是哪一行代码出了问题


四、DeepSpeed 如何使用?

4.1 DeepSpeed 安装

bash
$ pip install deepspeed==0.8.1
$ sudo apt-get update
$ sudo apt-get install openmpi-bin libopenmpi-dev
$ pip install mpi4py

📝通俗解释:安装说明 - DeepSpeed的安装需要:

  1. PyTorch(基础环境)
  2. deepspeed包(核心库)
  3. OpenMPI(通信库)
  4. mpi4py(Python的MPI接口,如果使用mpi通信策略才需要)

4.2 DeepSpeed 分布式启动器各命令含义

Argument含义示例
master_port主节点的端口号--master_port 29500
master_addr主节点的IP--master_addr=10.51.97.28 (ifconfig->eth0->inet)
nnodes节点数两台机器,--nnodes=2
node_rank节点rank,以第一台机器为0开始递增--node_rank=0 master,即主节点rank
nproc_per_node每个节点的进程数一个节点使用八张卡,nproc_per_node=8

📝通俗解释:启动参数 - 就像组网会议需要确定:

  • 在哪里开(master_addr)
  • 端口号是多少(master_port)
  • 有多少台电脑(nnodes)
  • 每台电脑用几张卡(nproc_per_node)
  • 这是第几台电脑(node_rank)

4.3 DeepSpeed 使用

📝通俗解释:使用流程 - DeepSpeed的使用其实和普通PyTorch模型训练差不多,主要区别在于:

  1. 用deepspeed.initialize()初始化模型和优化器
  2. 用model_engine代替model进行前向传播
  3. 用model_engine.backward()代替loss.backward()
  4. 用model_engine.step()代替optimizer.step()

注:使用 DeepSpeed 其实和写一个 pytorch 模型只有部分区别,一开始的流程是一样的。

1. 导包

python
import json, time, random
import torch
from sklearn.metrics import classification_report
from torch.utils.data import DataLoader
from collections import Counter
from transformers import BertForMaskedLM, BertTokenizer, BertForSequenceClassification, BertConfig, AdamW
import torch.nn as nn
import numpy as np

# 导入 deepspeed 分布式训练包
import deepspeed
import torch.distributed as dist

# 定义 设置随机种子
def set_seed(seed=123):
    """
    设置随机数种子,保证实验可重现
    :param seed:
    :return:
    """
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)
    torch.cuda.manual_seed_all(seed)

2. 定义获取和加载训练集函数

python
def get_data():
    with open("data/train.json", "r", encoding="utf-8") as fp:
        data = fp.read()
        data = json.loads(data)
    return data

def load_data():
    data = get_data()
    return_data = []
    # [(文本, 标签id)]
    for d in data:
        text = d[0]
        label = d[1]
        return_data.append((" ".join(text.split(" ")).strip(), label))
    return return_data

3. 定义训练集编码类

python
class Collate:
    def __init__(
        self,
        tokenizer,
        max_seq_len,
    ):
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len

    def collate_fn(self, batch):
        input_ids_all = []
        token_type_ids_all = []
        attention_mask_all = []
        label_all = []
        for data in batch:
            text = data[0]
            label = data[1]
            inputs = self.tokenizer.encode_plus(
                text=text,
                max_length=self.max_seq_len,
                padding="max_length",
                truncation="longest_first",
                return_attention_mask=True,
                return_token_type_ids=True
            )
            input_ids = inputs["input_ids"]
            token_type_ids = inputs["token_type_ids"]
            attention_mask = inputs["attention_mask"]
            input_ids_all.append(input_ids)
            token_type_ids_all.append(token_type_ids)
            attention_mask_all.append(attention_mask)
            label_all.append(label)

        input_ids_all = torch.tensor(input_ids_all, dtype=torch.long)
        token_type_ids_all = torch.tensor(token_type_ids_all, dtype=torch.long)
        attention_mask_all = torch.tensor(attention_mask_all, dtype=torch.long)
        label_all = torch.tensor(label_all, dtype=torch.long)
        return_data = {
            "input_ids": input_ids_all,
            "attention_mask": attention_mask_all,
            "token_type_ids": token_type_ids_all,
            "label": label_all
        }
        return return_data

4. 定义 Trainer 训练类

python
class Trainer:
    def __init__(
        self,
        args,
        config,
        model_engine,
        criterion,
        optimizer
    ):
        self.args = args
        self.config = config
        self.model_engine = model_engine
        self.criterion = criterion
        self.optimizer = optimizer

    def on_step(self, batch_data):
        label = batch_data["label"].cuda()
        input_ids = batch_data["input_ids"].cuda()
        token_type_ids = batch_data["token_type_ids"].cuda()
        attention_mask = batch_data["attention_mask"].cuda()
        output = self.model_engine.forward(
            input_ids=input_ids,
            token_type_ids=token_type_ids,
            attention_mask=attention_mask,
            labels=label
        )
        logits = output[1]
        return logits, label

    # loss 聚合计算
    def loss_reduce(self, loss):
        rt = loss.clone()
        dist.all_reduce(rt, op=dist.ReduceOp.SUM)
        rt /= torch.cuda.device_count()
        return rt

    # output 聚合
    def output_reduce(self, outputs, targets):
        output_gather_list = [torch.zeros_like(outputs) for _ in range(torch.cuda.device_count())]
        # 把每一个GPU的输出聚合起来
        dist.all_gather(output_gather_list, outputs)

        outputs = torch.cat(output_gather_list, dim=0)
        target_gather_list = [torch.zeros_like(targets) for _ in range(torch.cuda.device_count())]
        # 把每一个GPU的输出聚合起来
        dist.all_gather(target_gather_list, targets)
        targets = torch.cat(target_gather_list, dim=0)
        return outputs, targets

    def train(self, train_loader, dev_loader=None):
        global_step = 1  # 修正:gloabl_step -> global_step
        best_acc = 0.
        if self.args.local_rank == 0:
            start = time.time()
        for epoch in range(1, self.args.epochs + 1):
            for step, batch_data in enumerate(train_loader):
                self.model_engine.train()
                logits, label = self.on_step(batch_data)
                loss = self.criterion(logits, label)
                self.model_engine.backward(loss)
                self.model_engine.step()
                # loss 聚合计算
                loss = self.loss_reduce(loss)
                if self.args.local_rank == 0:
                    print("【train】 epoch: {}/{} step: {}/{} loss: {:.6f}".format(
                        epoch, self.args.epochs, global_step, self.args.total_step, loss
                    ))
                global_step += 1
                if self.args.dev:
                    if global_step % self.args.eval_step == 0:
                        loss, accuracy = self.dev(dev_loader)
                        if self.args.local_rank == 0:
                            print("【dev】 loss: {:.6f} accuracy: {:.4f}".format(loss, accuracy))
                        if accuracy > best_acc:
                            best_acc = accuracy
                            self.model_engine.save_checkpoint(self.args.ckpt_path, save_latest=True)
                            if self.args.local_rank == 0:
                                print("【best accuracy】 {:.4f}".format(best_acc))

        if self.args.local_rank == 0:
            end = time.time()
            print("耗时:{}分钟".format((end - start) / 60))
        if not self.args.dev:
            self.model_engine.save_checkpoint(self.args.ckpt_path, save_latest=True)

    def dev(self, dev_loader):
        self.model_engine.eval()
        correct_total = 0
        num_total = 0
        loss_total = 0.
        with torch.no_grad():
            for step, batch_data in enumerate(dev_loader):
                logits, label = self.on_step(batch_data)
                loss = self.criterion(logits, label)
                # loss 聚合计算
                loss = self.loss_reduce(loss)
                # output 聚合
                logits, label = self.output_reduce(logits, label)
                loss_total += loss
                logits = logits.detach().cpu().numpy()
                label = label.view(-1).detach().cpu().numpy()
                num_total += len(label)
                preds = np.argmax(logits, axis=1).flatten()
                correct_num = (preds == label).sum()
                correct_total += correct_num

        return loss_total, correct_total / num_total

    def test(self, model, test_loader, labels):
        self.model_engine = model
        self.model_engine.eval()
        preds = []
        trues = []
        with torch.no_grad():
            for step, batch_data in enumerate(test_loader):
                logits, label = self.on_step(batch_data)
                # output 聚合
                logits, label = self.output_reduce(logits, label)
                label = label.view(-1).detach().cpu().numpy().tolist()
                logits = logits.detach().cpu().numpy()
                pred = np.argmax(logits, axis=1).flatten().tolist()
                trues.extend(label)
                preds.extend(pred)

        # print(trues, preds, labels)
        print(np.array(trues).shape, np.array(preds).shape)
        report = classification_report(trues, preds, target_names=labels)
        return report

5. Args 参数类定义

python
class Args:
    model_path = "/mnt/kaimo/data/pretrain/bert-base-chinese"
    ckpt_path = "output/deepspeed/"
    max_seq_len = 128
    ratio = 0.92
    epochs = 5
    eval_step = 50
    dev = False
    local_rank = None

6. 初始化 DeepSpeed 引擎

python
deepspeed_config = {
    "train_micro_batch_size_per_gpu": 32,
    "gradient_accumulation_steps": 1,
    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": 3e-5
        }
    },
    "fp16": {
        "enabled": True
    },
    "zero_optimization": {
        "stage": 3,              # ZeRO第3阶段
        "allgather_partitions": True,
        "allgather_bucket_size": 2e8,
        "overlap_comm": True,
        "reduce_scatter": True,
        "reduce_bucket_size": 2e8
    },
    "activation_checkpointing": {
        "partition_activations": True,
        "cpu_checkpointing": True,
        "contiguous_memory_optimization": True
    },
    "wall_clock_breakdown": True,
    "log_dist": False,
}

📝通俗解释:配置参数说明 - 核心配置项:

  • train_micro_batch_size_per_gpu: 每个GPU的小批次大小
  • gradient_accumulation_steps: 梯度累积步数(相当于"虚拟"增大batch size)
  • fp16: 混合精度训练开关
  • zero_optimization.stage: ZeRO的阶段(0=不用,1=优化器分片,2+=梯度分片,3+=参数分片)
  • activation_checkpointing: 激活值检查点,用时间换内存

注:需要注意的是在ZeRO第3阶段,模型被划分到不同的GPU上了

7. 主函数 main()

python
def main():
    # ================================
    # 定义相关参数
    set_seed()
    label2id = {
        "其他": 0,
        "喜好": 1,
        "悲伤": 2,
        "厌恶": 3,
        "愤怒": 4,
        "高兴": 5,
    }
    args = Args()
    tokenizer = BertTokenizer.from_pretrained(args.model_path)
    # ================================

    # ================================
    # 加载数据集
    data = load_data()
    # 取1万条数据出来
    data = data[:10000]
    random.shuffle(data)
    train_num = int(len(data) * args.ratio)
    train_data = data[:train_num]
    dev_data = data[train_num:]

    collate = Collate(tokenizer, args.max_seq_len)
    train_loader = DataLoader(
        train_data,
        batch_size=deepspeed_config["train_micro_batch_size_per_gpu"],
        shuffle=True,
        num_workers=2,
        collate_fn=collate.collate_fn
    )
    total_step = len(train_loader) * args.epochs
    args.total_step = total_step
    dev_loader = DataLoader(
        dev_data,
        batch_size=deepspeed_config["train_micro_batch_size_per_gpu"],
        shuffle=False,
        num_workers=2,
        collate_fn=collate.collate_fn
    )
    test_loader = dev_loader
    # ================================

    # ================================
    # 定义模型、优化器、损失函数
    config = BertConfig.from_pretrained(args.model_path, num_labels=6)
    model = BertForSequenceClassification.from_pretrained(args.model_path, config=config)
    model.cuda()
    criterion = torch.nn.CrossEntropyLoss()
    '''注: 这里需要用 deepspeed 初始化 model 和 optimizer'''
    model_engine, optimizer, _, _ = deepspeed.initialize(
        config=deepspeed_config,
        model=model,
        model_parameters=model.parameters()
    )

    args.local_rank = model_engine.local_rank
    # ==============================

    # ==============================
    # 定义训练器
    trainer = Trainer(
        args,
        config,
        model_engine,
        criterion,
        optimizer
    )

    # 训练和验证
    trainer.train(train_loader, dev_loader)
    # ==============================

    # ==============================
    # 测试
    labels = list(label2id.keys())
    config = BertConfig.from_pretrained(args.model_path, num_labels=6)
    model = BertForSequenceClassification.from_pretrained(args.model_path, config=config)
    model.cuda()

    # 需要重新初始化引擎
    model_engine, optimizer, _, _ = deepspeed.initialize(
        config=deepspeed_config,
        model=model,
        model_parameters=model.parameters()
    )
    model_engine.load_checkpoint(args.ckpt_path, load_module_only=True)
    report = trainer.test(model_engine, test_loader, labels)
    if args.local_rank == 0:
        print(report)
    # ==============================

if __name__ == '__main__':
    main()

8. 运行代码

开始运行代码:

bash
$ deepspeed test.py --deepspeed_config config.json

📝通俗解释:运行命令 - DeepSpeed的启动方式和普通Python不同,需要用deepspeed命令来启动,它会自动处理多GPU/多节点的分布式环境

测试的时候发现每块GPU对每批数据都进行计算了一次,这里可能需要做些修改,暂时还没找到相关的方法。

text
[2023-06-28 00:58:12, 759] [INFO] [engine.py:2824:_get_all_zero_checkpoints_dicts] successfully read 2 ZeRO state_dicts for rank 0
[2023-06-28 00:58:13, 005] [INFO] [engine.py:2774:_load_zero_checkpoint] lg 2 zero partition checkpoints for rank 1
[2023-06-28 00:58:13, 005] [INFO] [engine.py:2774:_load_zero_checkpoint] lg 2 zero partition checkpoints for rank 0
(1600,) (1600,)
precisionrecallf1-scoresupport
其他0.620.670.64546
喜好0.500.570.53224
悲伤0.490.390.44228
厌恶0.420.420.42240
愤怒0.580.480.53124
高兴0.640.620.63238
accuracy0.561600
macro avg0.540.530.531600
weighted avg0.550.560.551600
text
(1600,) (1600,)

9. 训练模型转化

在 /output/deepspeed 生成多GPU的模型:

bash
$ ls output/deepspeed/global_step1440/
>>>
zero_pp_rank_0_mp_rank_00_model_states.pt
zero_pp_rank_0_mp_rank_00_optim_states.pt
zero_pp_rank_1_mp_rank_00_model_states.pt
zero_pp_rank_1_mp_rank_00_optim_states.pt

需要利用 /output/deepspeed 下有一个 zero_to_fp32.py 文件,我们可以利用将多GPU的模型转换为完整的:

bash
$ python zero_to_fp32.py output/deepspeed/ ./pytorch_model.bin

📝通俗解释:模型转换 - ZeRO会把模型参数分散存储在多个GPU上,训练完成后需要用zero_to_fp32.py把分散的参数"拼"成一个完整的模型文件,就像把拼图拼成完整图画一样


五、DeepSpeed 全部代码

(完整代码见上方第4节,此处省略重复内容)


六、优化器和调度器

📝通俗解释:优化器和调度器 - 优化器像是"调整步伐"的教练,调度器像是"控制节奏"的节拍器

当不使用 offload_optimizer 时,可以按照下表,混合使用 HF 和 DS 的优化器和调度器,除了 HF Scheduler 和 DS Optimizer 这一种情况。

组合HF SchedulerDS Scheduler
HF OptimizerYesYes
DS OptimizerNoYes

6.1 优化器

  • 启用 offload_optimizer 时可以使用非 DeepSpeed 的优化器,只要它同时具有 CPU 和 GPU 的实现(LAMB 除外)。
  • DeepSpeed 的主要优化器是 Adam、AdamW、OneBitAdam 和 Lamb。这些已通过 ZeRO 进行了彻底测试,建议使用。
  • 如果没有在配置文件中配置优化器参数,Trainer 将自动将其设置为 AdamW,并将使用命令行参数的默认值:--learning_rate--adam_beta1--adam_beta2--adam_epsilon--weight_decay
  • 与 AdamW 类似,可以配置其他官方支持的优化器。请记住,它们可能具有不同的配置值。例如对于 Adam,需要将 weight_decay 设置为 0.01 左右。
  • 此外,offload 在与 Deepspeed 的 CPU Adam 优化器一起使用时效果最佳。如果想对 offload 使用不同的优化器,deepspeed==0.8.3 以后的版本,还需要添加:
json
{
    "zero_force_ds_cpu_optimizer": false
}

📝通俗解释:优化器选择建议:

  • 首选:DeepSpeed自带的AdamW(经过充分测试)
  • 如果要offload到CPU:使用DeepSpeed的CPU Adam
  • 大模型:可以考虑Lamb优化器(Layer-wise Adaptive Rate)

6.2 调度器

  • DeepSpeed 支持 LRRangeTest、OneCycle、WarmupLR 和 WarmupDecayLR 学习率调度器。
  • Transformers 和 DeepSpeed 中调度器的 overlap:
text
WarmupLR 使用 --lr_scheduler_type constant_with_warmup
WarmupDecayLR 使用 --lr_scheduler_type linear

📝通俗解释:学习率调度器 - 就像跑步时的体力分配:

  • WarmupLR: 开始时慢慢加速(热身)
  • WarmupDecayLR: 热身后逐渐减速
  • OneCycle: 先加速再减速(像冲刺)
  • LRRangeTest: 用来测试最佳学习率

七、训练精度

📝通俗解释:训练精度选择 - 这是一个"速度vs精度"的权衡游戏

  • 由于 fp16 混合精度大大减少了内存需求,并可以实现更快的速度,因此只有在此训练模式下表现不佳时,才考虑不使用混合精度训练。通常,当模型未在 fp16 混合精度中进行预训练时,会出现这种情况(例如,使用 bf16 预训练的模型)。这样的模型可能会溢出,导致 loss 为 NaN。如果是这种情况,使用完整的 fp32 模式。
  • 如果是基于 Ampere 架构的 GPU,pytorch 1.7 及更高版本将自动切换为使用更高效的 tf32 格式进行某些操作,但结果仍将采用 fp32。
  • 使用 Trainer,可以使用 --tf32 启用它,或使用 --tf32 0 或 --no_tf32 禁用它。PyTorch 默认值是使用 tf32。

7.1 自动混合精度

  • fp16

    • 可以使用 pytorch-like AMP 方式或者 apex-like 方式
    • 使用 --fp16、--fp16_backend amp 或 --fp16_full_eval 命令行参数时启用此模式
  • bf16

    • 使用 --bf16 或 --bf16_full_eval 命令行参数时启用此模式

📝通俗解释:fp16 vs bf16:

  • fp16: 精度更高(16位),但范围较小,可能溢出
  • bf16: 精度较低(16位),但范围更大,不容易溢出
  • 建议:Ampere架构GPU用bf16,旧GPU用fp16

7.2 NCCL

  • 通讯会采用一种单独的数据类型
  • 默认情况下,半精度训练使用 fp16 作为 reduction 操作的默认值
  • 可以增加一个小的开销并确保 reduction 将使用 fp32 作为累积数据类型
json
{
  "communication_data_type": "fp32"
}

📝通俗解释:通信数据类型 - 虽然训练用fp16,但通信时可以用fp32来避免精度损失,就像用更好的包装箱来运输货物

7.3 Apex

  • Apex 是一个在 PyTorch 深度学习框架下用于加速训练和提高性能的库。Apex 提供了混合精度训练、分布式训练和内存优化等功能,帮助用户提高训练速度、扩展训练规模以及优化 GPU 资源利用率。
  • 使用 --fp16、--fp16_backend apex、--fp16_opt_level 01 命令行参数时启用此模式
json
"amp": {
    "enabled": "auto",
    "opt_level": "auto"
}

八、获取模型参数

📝通俗解释:模型参数获取 - DeepSpeed会把模型参数分散存储,需要特殊方式提取

  • DeepSpeed 会在优化器参数中存储模型的主参数,存储在 global_step*/*optim_states.pt 文件中,数据类型为 fp32。因此,想要从 checkpoint 中恢复训练,则保持默认即可
  • 如果模型是在 ZeRO-2 模式下保存的,模型参数会以 fp16 的形式存储在 pytorch_model.bin
  • 如果模型是在 ZeRO-3 模式下保存的,需要如下所示设置参数,否则 pytorch_model.bin 将不会被创建
json
{
  "zero_optimization": {
    "stage3_gather_16bit_weights_on_model_save": true
  }
}
  • 离线获取 fp32 权重
bash
$ python zero_to_fp32.py . pytorch_model.bin

8.1 ZeRO-3 and Infinity Nuances

  • 构造超大模型:对于超大模型(超过GPU显存),可以采用ZeRO-3 + NVMe offload的方式,将部分参数存储在硬盘上
  • 搜集参数:在ZeRO-3中,前向传播时需要收集分散在各个GPU上的参数,通过gathered_parameters上下文管理器来控制

📝通俗解释:ZeRO-3的注意事项 - ZeRO-3会把模型参数拆散放到不同GPU上,所以:

  1. 保存时需要特殊设置才能得到完整的模型文件
  2. 前向传播时需要临时"收集"参数
  3. 适合超大模型,小模型用ZeRO-3反而更慢

填坑笔记

1. ModuleNotFoundError: No module named 'torch._six'

  • 报错内容ModuleNotFoundError: No module named 'torch._six'
  • 解决方法
    • 找到报错的文件
    • 注释掉:from torch._six import string_classes
    • 加入:
      python
      int_classes = int
      string_classes = str
    • 如果还报错:NameError: name 'inf' is not defined
    • 找到文件中的那一行,前面加入:
      python
      import math
      inf = math.inf

📝通俗解释:这个错误通常是因为PyTorch版本升级后,某些内部API发生了变化。torch._six是PyTorch早期版本用来处理Python2/3兼容性的,Python3已经不需要了

2. 为什么单卡的情况,也可以使用 deepspeed?

  1. 使用 ZeRO-offload,将部分数据 offload 到 CPU,降低对显存的需求
  2. 提供了对显存的管理,减少显存中的碎片

📝通俗解释:单卡使用DeepSpeed的好处 - 就像请了一个"内存管家":

  • ZeRO-offload: 把不用的数据"寄存"到CPU内存
  • 显存碎片整理: 就像整理房间,让空间利用率更高

3. 不同 ZeRO 如何配置

3.1 ZeRO-2

配置示例:

json
{
    "fp16": {
        "enabled": "auto",
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1
    },
    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": "auto",
            "betas": "auto",
            "eps": "auto",
            "weight_decay": "auto"
        }
    },
    "scheduler": {
        "type": "WarmupLR",
        "params": {
            "warmup_min_lr": "auto",
            "warmup_max_lr": "auto",
            "warmup_num_steps": "auto"
        }
    },
    "zero_optimization": {
        "stage": 2,
        "offload_optimizer": {
            "device": "cpu

基于 MIT 许可发布