PyTorch 使用 DP 模式实现数据并行

PyTorch 使用 torch.nn.DataParallel 来实现基于数据并行的单机、多 GPU 并行训练。使用 DP 方式在多 GPU 上训练模型,需要保证模型能够在每个 GPU 上放得下,训练过程中会把整个模型都复制到每个 GPU 内,通过数据并行的方式提高训练模型的速度。
虽然 DataParallel 使用起来非常容易,但是通常不能够获得最佳的性能。DataParallel 在每一轮的前向传播过程中,会复制一遍模型,同时这种基于单进程、多线程的并行方式也会存在 GIL(Global Interpreter Lock) 竞争问题。

DP 数据并行训练流程

下面我们分析一下 DP 数据并行模式在多 GPU 的情况下训练模型的基本流程,如下图所示:
DP-Interaction-Process
基于 DP 模式,模型训练的基本过程分为三个阶段,描述如下:

前向传播计算过程

  • 1.Scatter mini-batch inputs to GPUs

我们通过指定 batch_size 大小,对输入的训练数据集进行了分割,在训练神经网络模型的前向传播过程中,会将每一个小批数据(Mini-Batch)分发到每一个 GPU 上。具体过程是:将 4 个小批数据 i1 ~ i4 复制到 GPU-1 上,再将 4 个小批数据分别发送到 GPU-1 ~ GPU-4 上。

  • 2.Replicate model on GPUs

将整个模型都拷贝到所有的 GPU 上,每个 GPU 上保存模型的一个完整的副本,所以要求整个模型必须能够放到每个 GPU 上才能进行训练。这样,每个 GPU 上既有了模型,又有了对应的小批训练数据,就可以在 GPU 本地执行一轮的前向传播计算过程了。

  • 3.Parallel forward pass

这时,4 个 GPU 分别在 GPU 本地独立运行前向传播计算过程,这个过程就实现了并行训练。每个 GPU 执行计算并输出结果,将结果缓存在自己的 GPU 内,以供接下来计算 Loss 使用。

Loss 计算过程

  • 1.Scatter mini-batch labels to GPUs

每个 GPU 计算的结果数据 o1 ~ o4,会汇集到 GPU-1(即第一个 GPU)上,然后在 GPU-1 上使用前面每个 GPU 计算的结果数据,计算得到所有的 Label 结果并将 Label 结果分发到每个 GPU 上,为下面并行计算 Loss 提供数据。这里,相当于在 GPU-1 上调用模型的 pred = model(X) 进行计算。

  • 2.Parallel loss computation

每个 GPU 上都有了 Label 结果数据,以及之前缓存的结果 o1 ~ o4,接着就可以并行计算 Loss,分别得到 loss1 ~ loss4。这里,相当于在每个 GPU 上调用模型的 loss = loss_fn(pred, y) 计算得到 Loss 结果。

反向传播计算过程

  • 1.Compute loss gradients on GPUs

每个 GPU 上都有自己计算得到的 Loss 结果,基于该结果并行计算 Loss 的梯度。这里,相当于在每个 GPU 上调用模型的 loss.backward() 进行计算,计算结果 grad_l1 ~ grad_l4 分别保存在每个 GPU 自己本地。

  • 2.Parallel backward pass

在每个 GPU 上分别计算模型参数的梯度 grad1_m ~ grad4_m,这个过程实现了多 GPU 并行计算。

  • 3.Reduce gradients to GPU-1

根据每个 GPU 上得到的梯度结果数据,分别发送到 GPU-1 上进行统一的 Reduce 计算,对每个 GPU 上计算得到的模型参数梯度进行累加计算:grad1_m + grad2_m + grad3_m + grad4_m,这样就得到了一次迭代模型的参数变化梯度值,可以使用新结果直接对模型进行更新。

通过上面三个阶段的计算,我们使用每个 GPU 分别处理了一批数据,这时在 GPU-1 上得到了新的模型参数。也就是说,目前每个 GPU 上模型参数已经不是最新的。为了后面继续实现数据并行计算,在继续训练模型之前需要将 GPU-1 上新的模型参数同步更新到每个 GPU 上。这对应到上面描述的 “前向传播计算过程” 阶段第 2 步要进行的处理。

PyTorch DP 数据并行 API

PyTorch 实现的 DP 数据并行 API,可以直接使用 nn.DataParallel 来 wrap 我们实现的 nn.Module 模型即可,nn.DataParallel 定义如下所示:

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

其中各个参数的含义,说明如下:

  • module

module 是我们自己实现的继承自 nn.Module 模型类,使用 DP 实现数据并行,不需要对模型进行任何修改就可以使用。但是,需要保证我们的模型能够放到每个 GPU 内,否则就无法使用。

  • device_ids:

指定我们使用的 GPU 设备列表,例如 device_ids=[0, 1, 2],或者 device_ids=['cuda:0','cuda:1', 'cuda:2']。如果不设置该值,就会使用主机上所有的 GPU。

  • output_device:

指定结果输出的设备位置,默认是 device_ids[0]。

  • dim:

对 Tensor 进行分发所使用的 GPU 设备,默认 dim=0,即在 device_ids[0] 进行 Tensor 的分发。

DP 训练模型实例

使用 DP 模式训练模型,可以直接 warp 一下我们实现的模型,并指定使用 GPU,就可以进行训练,示例核心代码如下:

model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:
  print("Let's use", torch.cuda.device_count(), "GPUs!")
  # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
  model = nn.DataParallel(model)

model.to(device)

我们可以简单实践一下,直接使用 DP 实现数据并行,示例代码如下所示:

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

# Download training data from open datasets.
training_data = datasets.FashionMNIST(
  root="data", train=True, download=True, transform=ToTensor(),
)

# Create data loaders.
batch_size = 64
train_dataloader = DataLoader(training_data, batch_size=batch_size)

# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        # print(f"shape: {x.size()}")
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)

if torch.cuda.device_count() > 1:
  print("Let's use", torch.cuda.device_count(), "GPUs!")
  model = nn.DataParallel(model)
else :
  print("GPU count: ", torch.cuda.device_count())

model.to(device)

# Define loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

# training loop
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

# Train the DP model
train(train_dataloader, model, loss_fn, optimizer)

使用 DP 的数据并行方式,会根据单机上 GPU 个数或者自己配置使用的 GPU 组,将输入数据自动分割,并分发到每一个 GPU 上进行模型训练。当每个 GPU 上的数据训练计算结束之后,DataParallel 会自动收集结果、合并结果,最终输出累加后的结果。所以在实际训练过程中,对于进行分发数据和聚合结果的 GPU 设备会有更高的使用率,相比于其它几个 GPU 可能会更容易出现问题,比如内存使用 OOM 问题等。
在一些要求不高的场景下可以直接使用 DP 数据并行模式,比如调试测试模型,不建议在正式生产环境中直接使用 DP 数据并行模式训练模式。

参考资源

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>