Skip to content

图解分布式训练(二)—— nn.DataParallel篇

来源:AiGC面试宝典 作者:宁静致远 日期:2023年09月29日

---## 为什么需要nn.DataParallel?

多GPU并行训练的原理是将模型参数和数据分布到多个GPU上,同时利用多个GPU的计算能力加速训练过程。具体实现需要考虑以下两个核心问题:

1. 数据如何划分?

由于模型需要处理的数据量通常很大,将所有数据放入单个GPU内存中可能会导致内存不足,因此我们需要将数据划分到多个GPU上。主要有两种划分方式:

  • 数据并行(Data Parallelism):将数据分割成多个小批次,每个GPU处理其中的一个小批次,然后将梯度汇总后更新模型参数。
  • 模型并行(Model Parallelism):将模型分解成多个部分,每个GPU处理其中一个部分,并将处理结果传递给其他GPU以获得最终结果。

📝 通俗解释:数据并行就像一条流水线上的多个工人,每个工人处理产品的一部分;模型并行则像把一个复杂产品拆分成多个简单部件,分别由不同工人负责生产。

2. 计算如何协同?

每个GPU都需要计算模型参数的梯度并将其发送给其他GPU,因此需要使用同步机制来保证计算正确性。主要有两种同步方式:

  • 同步数据并行:在每个GPU上计算模型参数的梯度,然后将梯度发送到其他GPU上进行汇总,最终更新模型参数。
  • 异步数据并行:每个GPU独立计算梯度并更新模型参数,不需要等待其他GPU。

📝 通俗解释:同步方式就像接力赛,每个人跑完自己的部分后必须等下一棒准备好才能继续;异步方式则像各自独立跑完全程,最后再比较成绩。


一、PyTorch中的GPU操作默认是什么样?

PyTorch中的GPU操作默认是异步的。当调用函数需要使用GPU时,该函数操作会进入特定设备队列中等待执行。正因为此,PyTorch支持并行计算。不过,PyTorch并行计算的效果对调用者是不可见的。

PyTorch中的多GPU并行计算是数据级并行,相当于开了多个进程,每个进程独立运行,然后再整合在一起。

python
device_ids = [0, 1]
net = torch.nn.DataParallel(net, device_ids=device_ids)

📝 通俗解释:GPU异步操作就像餐厅厨房的叫号系统,服务员把订单放到队列里,厨师按顺序做,顾客不需要站在厨房门口等做好了才能点下一道菜。


二、nn.DataParallel函数介绍

函数定义

python
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

函数参数:

参数说明
module即模型,此处注意,虽然输入数据被均分到不同GPU上,但每个GPU上都要拷贝一份模型
device_ids参与训练的GPU列表,例如三块卡,device_ids = [0, 1, 2]
output_device指定输出GPU,一般省略。在省略的情况下,默认为第一块卡,即索引为0的卡

📝 通俗解释:参数module就像是要复印的文件,每个GPU都需要一份完整的复印件;device_ids就像指定哪些打印机同时工作;output_device就像指定在哪台打印机上汇总最终结果。

注意:输入计算是被多块卡均分的,但输出loss的计算是由output_device这一张卡独自承担的,这就造成这张卡所承受的计算量要大于其他参与训练的卡。

使用实例

python
net = torch.nn.Linear(100, 1)
print(net)
print('-------------------')
net = torch.nn.DataParallel(net, device_ids=[0, 3])
print(net)

输出

Linear(in_features=100, out_features=1, bias=True)
-------------------
DataParallel(
  (module): Linear(in_features=100, out_features=1, bias=True)
)

📝 通俗解释:从输出结果可以看到nn.DataParallel()把原始模型包裹起来了,形成了新的DataParallel模块。然后我们就可以使用这个net来进行训练和预测了,它将自动在第0块GPU和第3块GPU上进行并行计算,然后自动把计算结果合并。


三、nn.DataParallel处理逻辑介绍

nn.DataParallel的处理流程如下:

  1. 若干块计算GPU:如图中GPU0~GPU2;还有1块梯度收集GPU(AllReduce操作所在GPU)。
  2. 复制模型:在每块计算GPU上都拷贝一份完整的模型参数。
  3. 分发数据:把一份数据X(例如一个batch)均匀分给不同的计算GPU。
  4. 并行计算:每块计算GPU做一轮前向传播(FWD)和反向传播(BWD)后,算得一份梯度G。
  5. 梯度聚合:每块计算GPU将自己的梯度push给梯度收集GPU,做聚合操作(一般指梯度累加,也支持用户自定义)。
  6. 参数更新:梯度收集GPU聚合完毕后,计算GPU从它那pull下完整的梯度结果,用于更新模型参数W。更新完毕后,计算GPU上的模型参数依然保持一致。
  7. AllReduce:聚合再下发梯度的操作,称为AllReduce。

流程图解

                    AllReduce (梯度聚合)

    ┌─────────────────┼─────────────────┐
    │                 │                 │
  Push G0          Push G1          Push G2
    │                 │                 │
+-------------+  +-------------+  +-------------+
|   GPU 0     |  |   GPU 1     |  |   GPU 2     |
|  G0→FWD/BWD |  |  G1→FWD/BWD |  |  G2→FWD/BWD |
|    W0       |  |    W1       |  |    W2       |
+-------------+  +-------------+  +-------------+
    ↑                 ↑                 ↑
    └─────────────────┼─────────────────┘

              数据分发 X → X0, X1, X2

📝 通俗解释:这个流程就像一个小组做作业,每个组员(GPU)都有一份完整的参考答案(模型副本),老师把作业题目(数据)平均分给每个人,大家各自完成后把答案(梯度)交给课代表(梯度收集GPU),课代表汇总大家的答案求出平均分,再把结果发回给每个组员,最后大家用更新后的参考答案继续做下一题。

补充说明

实现DP的一种经典编程框架叫"参数服务器",在这个框架里,计算GPU称为Worker,梯度聚合GPU称为Server。在实际应用中,为了尽量减少通讯量,一般可选择一个Worker同时作为Server(比如把梯度发到GPU0上做聚合)。

需要注意:

  • 1个Worker或者Server下可以不止1块GPU
  • Server可以只做梯度聚合,也可以同时做梯度聚合+全量参数更新

四、nn.DataParallel常见问题及解答

4.1 多GPU计算是否减少了程序运行时间?

虽然使用nn.DataParallel能够进行多GPU运算,但有时会导致程序花费的时间不减反增,这是为什么呢?

原因是:nn.DataParallel会将每个batch数据平均分配到不同device上进行并行计算,计算完再返回来合并。这导致GPU之间的开关和通讯过程占了大部分的时间开销。

📝 通俗解释:就像5个人一起搬砖,如果砖块数量不多,5个人来回交接砖的时间可能比一个人直接搬还慢。GPU之间的数据传输和同步就是"交接砖"的时间。

检测方法:可以使用watch -n 1 nvidia-smi命令来查看每秒各个GPU的运行情况。如果发现每个GPU的占用率均低于50%,基本可以肯定使用多GPU计算所花的时间要比单GPU计算更长。

4.2 如何保存和加载多GPU训练模型?

nn.DataParallel会在每个device上复制一个模型,这样就导致训练的不是一个模型,而是多个模型。那么如何保存和加载这些模型呢?

python
# 先构造一个网络
net = torch.nn.Linear(10, 1)
# 使用DataParallel包裹起来
net = torch.nn.DataParallel(net, device_ids=[0, 3])
# 保存网络(注意需要使用net.module)
torch.save(net.module.state_dict(), './networks/multiGPU.h5')

# 加载网络
new_net = torch.nn.Linear(10, 1)
new_net.load_state_dict(torch.load("./networks/multiGPU.h5"))

📝 通俗解释:因为DataParallel实际上是一个nn.Module包装器,真正的模型被藏在module属性里面。所以保存时要调用net.module来获取实际的模型,类似于从包装盒里取出真正的商品。

4.3 为什么第一块卡的显存占用更多?

最后一个参数output_device一般情况下是省略不写的,那么默认就是在device_ids[0],也就是第一块卡上。这就解释了为什么第一块卡的显存会比其他卡占用更多。

进一步说,当调用nn.DataParallel的时候,只是input数据是并行的,但output loss不是并行的。每次都会在第一块GPU上相加计算,这就造成了第一块GPU的负载远远大于其他显卡。

📝 通俗解释:就像一个小组长(GPU 0),不仅要和其他组员一样做题(计算),还要负责收作业、改卷子、发答案(汇总和分发结果),自然比其他组员更忙、更累。

4.4 训练时出现warning的原因

运行时会遇到如下警告:

UserWarning: Was asked to gather along dimension 0, but all input tensors were scalars;
will instead unsqueeze and return a vector.

原因说明

每张卡上的loss都是要汇总到第0张卡上求梯度,更新好以后把权重分发到其余卡。为什么会出现这个warning?

这与nn.DataParallel中最后一个参数dim有关。dim表示tensors被分散的维度,默认是0。nn.DataParallel将在dim=0(批处理维度)中对数据进行分块,并将每个分块发送到相应的设备。

单卡没有这个warning,多卡训练时会出现这个warning。由于计算loss时是分别在多卡计算的,返回的也就是多个loss。使用多少个GPU,就会返回多少个loss。

解决方案

  1. 使用size_average=False, reduce=True作为参数。每个GPU上的损失将相加,但不除以GPU上的批大小。然后将所有并行损耗相加,除以整批的大小。那么不管几块GPU,最终得到的平均loss都是一样的。

  2. PyTorch贡献者也实现了通过gather方式求loss平均的功能:

    • 如果在有2个GPU的系统上运行,DP将采用多GPU路径,调用gather并返回一个向量
    • 如果运行时只有1个GPU可见,DP将采用顺序路径,完全忽略gather,返回一个标量

📝 通俗解释:这个warning就像老师让每个小组长汇报平均分,但有的小组长只汇报了一个数字(有的小组只有1个人),有的小组长汇报了一组数字(有的小组有多个人),导致最后汇总时格式不一致。PyTorch后来修复了这个问题,让不同情况下返回的格式统一。

4.5 device_ids[0]被占用问题

在运行DataParallel模块之前,并行化模块必须在device_ids[0]上具有其参数和缓冲区。在执行DataParallel之前,会首先把模型的参数放在device_ids[0]上。

问题场景:服务器是八卡的服务器,刚好序号为0的卡被别人占用着,于是只能用其他的卡,比如用2和3号卡。如果直接指定device_ids=[2, 3]会出现模型初始化错误,类似于module没有复制到device_ids[0]上。

解决方案

python
import os

# 这两行代码需要放在import packages之后、DataParallel使用之前
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2, 3"

设置这两行代码后:

  • 对这个程序而言,可见GPU只有2和3号卡
  • 逻辑上对应0和1号卡
  • device_ids[0]对应第2号卡,device_ids[1]对应第3号卡
  • 模型会初始化在第2号卡上,不会占用第0号卡

📝 通俗解释:这就像你在一个有很多座位的教室里,但只想用第3排和第4排的座位。通过设置环境变量,你可以告诉程序"把第3排当作第1排,第4排当作第2排",这样程序就会在你指定的位置工作了。

优化器也可以使用DataParallel

python
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
optimizer = nn.DataParallel(optimizer, device_ids=device_ids)

五、nn.DataParallel参数更新方式

nn.DataParallel的参数更新流程如下:

  1. DataLoader把数据通过多个worker读到主进程的内存中
  2. 通过tensor的split语义,将一个batch的数据切分成多个更小的batch,然后分别送往不同的CUDA设备
  3. 在不同的CUDA设备上完成前向计算,网络的输出被gather到主CUDA设备上(初始化时使用的设备),loss而后在这里被计算出来
  4. loss然后被scatter到每个CUDA设备上,每个CUDA设备通过BP计算得到梯度
  5. 然后每个CUDA设备上的梯度被reduce到主CUDA设备上,然后模型权重在主CUDA设备上获得更新
  6. 在下一次迭代之前,主CUDA设备将模型参数broadcast到其它CUDA设备上,完成权重参数值的同步

📝 通俗解释:这个流程就像:老师发卷子(scatter数据)→ 学生各自做题(前向计算)→ 学生把答案交给课代表(gather输出)→ 课代表改卷子算总分(计算loss)→ 发回给学生(scatter loss)→ 学生算梯度(反向传播)→ 课代表收梯度(reduce)→ 课代表更新答案(更新参数)→ 课代表把正确答案发给所有学生(broadcast参数)。

分布式通信原语说明

  • Broadcast(广播):主进程将相同的数据分发给组里的每一个其它进程
  • Scatter(分散):主进程将数据的每一小部分分给组里的其它进程
  • Gather(收集):将其它进程的数据收集过来
  • Reduce(归约):将其它进程的数据收集过来并应用某种操作(比如SUM)
  • All-Reduce(全归约):多对多的reduce
  • All-Gather(全收集):多对多的gather

六、nn.DataParallel优点介绍

nn.DataParallel是PyTorch提供的数据并行方式,适用于单机多GPU的情况,使用非常方便,只需要在模型前加上nn.DataParallel即可。

主要优点

  • 使用简单:只需要一行代码即可实现多GPU并行
  • 易于理解:逻辑清晰,适合入门学习
  • 充分利用多GPU:能够有效利用多个GPU进行训练

📝 通俗解释:nn.DataParallel就像一个"一键启动多显卡"的开关,只需要把模型放进去,它自动帮你把任务分配到多个GPU上执行。


七、nn.DataParallel缺点介绍

  • 内存占用大:nn.DataParallel会将整个模型复制到每个GPU上,因此需要占用大量的GPU内存。当模型非常大时,可能会导致内存不足。

  • 数据通信开销大:nn.DataParallel使用的是数据并行方式,需要将每个GPU上的梯度进行汇总,因此需要进行大量的数据通信,可能会导致训练速度下降。

  • 不支持分布式:要求所有的GPU都在同一个节点上,无法跨机器分布式训练。

  • 不支持混合精度训练:不能使用Apex进行混合精度训练。

📝 通俗解释:这些缺点可以理解为:复制模型就像每个学生都需要买一本完整的教材(内存浪费);频繁交换答案就像课代表不停跑腿收作业、发作业(通信开销);只能同节点就像一个班的学生不能和另一个班合作(无法分布式);不支持混合精度就像不能用参考答案的简化版(无法用低精度加速)。


八、nn.DataParallel实战代码

使用torch.nn.DataParallel()进行分布式训练时,需要注意以下几点:

1. 模型加载到GPU的时机

model的处理顺序最好遵循以下步骤:

  • 首先获取模型(get model)
  • 然后模型移动到设备(model.to(device))
  • 最后使用DP(use DataParallel)

📝 补充:model一般不是纯model,而是model_with_loss。也就是说获取模型时,获取的模型不仅仅只有网络模型,同时也包含用于计算的loss的模型(比如检测头)。

2. 训练数据加载到GPU的时机

用于训练的数据(batch)中,不仅含有网络的输入数据input,大部分情况下也含有用于训练的标记数据target(batch由input和target两类数据构成)。

加载batch数据到GPU器件中的时机,最好选择在训练的一个iter过程中model进行forward之前,完成batch数据加载到器件(也就是to(device))的操作。

📝 补充:整个train的过程包含很多个epoch,而一个epoch的过程又包含很多个iter。一个epoch过程的iter数量等于:数据集样本数量 / batch_size。

3. batch_size与GPU数量的对应

一般而言batch_size的数量大于或等于GPU的数量。如果batch_size大于GPU的数量,则要保证batch_size可以被GPU数整除。

如果batch_size不能被GPU数整除,需要进行以下操作:

  • 确定主GPU分得的样本数量
  • 确定其他GPU所分得的样本数量
  • 对data_parallel进行改写,添加chunk_size的功能(具体参见CenterTrack项目中的data_parallel.py)

4. DataParallel参数设置

参数说明
module用于并行训练的model,使用前model必须已经被加载到device[0]中
device_ids参与训练的GPU id
output_device指定用于汇总梯度的GPU id,默认为device_ids[0]
dim默认为0,一般不需要改动

📝 通俗解释:官方要求model必须先加载到device_ids[0],这就像让小组长(GPU 0)先准备好教材,其他组员才能从组长那里拷贝。

完整训练代码示例

python
import torch
import torch.nn as nn
import torch.optim as optim

# 设置可见GPU
gpus = [0, 1, 2, 3]
torch.cuda.set_device('cuda:{}'.format(gpus[0]))

# 构建训练数据集
train_dataset = ...  # build train dataset
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
# batch_size的设置最好要能被GPU数量整除,如果不能,则要改动data_parallel

# 获取模型
model = ...  # get model or you can get model-with-loss
# 模型移动到设备并使用DP
model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0])

# 获取优化器
optimizer = optim.SGD(model.parameters(), lr=lr)

# 训练循环
for epoch in range(100):
    for batch_idx, (images, target) in enumerate(train_loader):
        images = images.cuda(non_blocking=True)
        target = target.cuda(non_blocking=True)
        # 一般这里可以合并为一个功能,输出的是一个batch包括输入数据input和训练标记数据target
        
        output = model(images)
        loss = criterion(output, target)
        # 一般这里也可以合并为一个功能,将model和loss合并为整体的模型modelwithloss,然后同时输出output和loss
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

📝 通俗解释:这段代码就是一个完整的多GPU训练流程:设置GPU → 准备数据 → 准备模型 → 准备优化器 → 循环训练。其中关键就是在模型后面加上nn.DataParallel(),让PyTorch自动把任务分配到多块GPU上执行。

基于 MIT 许可发布