PyTorch 使用 torch.nn.DataParallel 来实现基于数据并行的单机、多 GPU 并行训练。使用 DP 方式在多 GPU 上训练模型,需要保证模型能够在每个 GPU 上放得下,训练过程中会把整个模型都复制到每个 GPU 内,通过数据并行的方式提高训练模型的速度。
虽然 DataParallel 使用起来非常容易,但是通常不能够获得最佳的性能。DataParallel 在每一轮的前向传播过程中,会复制一遍模型,同时这种基于单进程、多线程的并行方式也会存在 GIL(Global Interpreter Lock) 竞争问题。
DP 数据并行训练流程
下面我们分析一下 DP 数据并行模式在多 GPU 的情况下训练模型的基本流程,如下图所示:
基于 DP 模式,模型训练的基本过程分为三个阶段,描述如下:
前向传播计算过程
- 1.Scatter mini-batch inputs to GPUs
我们通过指定 batch_size 大小,对输入的训练数据集进行了分割,在训练神经网络模型的前向传播过程中,会将每一个小批数据(Mini-Batch)分发到每一个 GPU 上。具体过程是:将 4 个小批数据 i1 ~ i4 复制到 GPU-1 上,再将 4 个小批数据分别发送到 GPU-1 ~ GPU-4 上。
- 2.Replicate model on GPUs
将整个模型都拷贝到所有的 GPU 上,每个 GPU 上保存模型的一个完整的副本,所以要求整个模型必须能够放到每个 GPU 上才能进行训练。这样,每个 GPU 上既有了模型,又有了对应的小批训练数据,就可以在 GPU 本地执行一轮的前向传播计算过程了。
- 3.Parallel forward pass
这时,4 个 GPU 分别在 GPU 本地独立运行前向传播计算过程,这个过程就实现了并行训练。每个 GPU 执行计算并输出结果,将结果缓存在自己的 GPU 内,以供接下来计算 Loss 使用。
Loss 计算过程
- 1.Scatter mini-batch labels to GPUs
每个 GPU 计算的结果数据 o1 ~ o4,会汇集到 GPU-1(即第一个 GPU)上,然后在 GPU-1 上使用前面每个 GPU 计算的结果数据,计算得到所有的 Label 结果并将 Label 结果分发到每个 GPU 上,为下面并行计算 Loss 提供数据。这里,相当于在 GPU-1 上调用模型的 pred = model(X) 进行计算。
- 2.Parallel loss computation
每个 GPU 上都有了 Label 结果数据,以及之前缓存的结果 o1 ~ o4,接着就可以并行计算 Loss,分别得到 loss1 ~ loss4。这里,相当于在每个 GPU 上调用模型的 loss = loss_fn(pred, y) 计算得到 Loss 结果。
反向传播计算过程
- 1.Compute loss gradients on GPUs
每个 GPU 上都有自己计算得到的 Loss 结果,基于该结果并行计算 Loss 的梯度。这里,相当于在每个 GPU 上调用模型的 loss.backward() 进行计算,计算结果 grad_l1 ~ grad_l4 分别保存在每个 GPU 自己本地。
- 2.Parallel backward pass
在每个 GPU 上分别计算模型参数的梯度 grad1_m ~ grad4_m,这个过程实现了多 GPU 并行计算。
- 3.Reduce gradients to GPU-1
根据每个 GPU 上得到的梯度结果数据,分别发送到 GPU-1 上进行统一的 Reduce 计算,对每个 GPU 上计算得到的模型参数梯度进行累加计算:grad1_m + grad2_m + grad3_m + grad4_m,这样就得到了一次迭代模型的参数变化梯度值,可以使用新结果直接对模型进行更新。
通过上面三个阶段的计算,我们使用每个 GPU 分别处理了一批数据,这时在 GPU-1 上得到了新的模型参数。也就是说,目前每个 GPU 上模型参数已经不是最新的。为了后面继续实现数据并行计算,在继续训练模型之前需要将 GPU-1 上新的模型参数同步更新到每个 GPU 上。这对应到上面描述的 “前向传播计算过程” 阶段第 2 步要进行的处理。
PyTorch DP 数据并行 API
PyTorch 实现的 DP 数据并行 API,可以直接使用 nn.DataParallel 来 wrap 我们实现的 nn.Module 模型即可,nn.DataParallel 定义如下所示:
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
其中各个参数的含义,说明如下:
- module
module 是我们自己实现的继承自 nn.Module 模型类,使用 DP 实现数据并行,不需要对模型进行任何修改就可以使用。但是,需要保证我们的模型能够放到每个 GPU 内,否则就无法使用。
- device_ids:
指定我们使用的 GPU 设备列表,例如 device_ids=[0, 1, 2],或者 device_ids=['cuda:0','cuda:1', 'cuda:2']。如果不设置该值,就会使用主机上所有的 GPU。
- output_device:
指定结果输出的设备位置,默认是 device_ids[0]。
- dim:
对 Tensor 进行分发所使用的 GPU 设备,默认 dim=0,即在 device_ids[0] 进行 Tensor 的分发。
DP 训练模型实例
使用 DP 模式训练模型,可以直接 warp 一下我们实现的模型,并指定使用 GPU,就可以进行训练,示例核心代码如下:
model = Model(input_size, output_size) if torch.cuda.device_count() > 1: print("Let's use", torch.cuda.device_count(), "GPUs!") # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs model = nn.DataParallel(model) model.to(device)
我们可以简单实践一下,直接使用 DP 实现数据并行,示例代码如下所示:
import torch from torch import nn from torch.utils.data import DataLoader from torchvision import datasets from torchvision.transforms import ToTensor # Download training data from open datasets. training_data = datasets.FashionMNIST( root="data", train=True, download=True, transform=ToTensor(), ) # Create data loaders. batch_size = 64 train_dataloader = DataLoader(training_data, batch_size=batch_size) # Get cpu, gpu or mps device for training. device = ( "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" ) print(f"Using {device} device") # Define model class NeuralNetwork(nn.Module): def __init__(self): super().__init__() self.flatten = nn.Flatten() self.linear_relu_stack = nn.Sequential( nn.Linear(28*28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10) ) def forward(self, x): x = self.flatten(x) # print(f"shape: {x.size()}") logits = self.linear_relu_stack(x) return logits model = NeuralNetwork().to(device) if torch.cuda.device_count() > 1: print("Let's use", torch.cuda.device_count(), "GPUs!") model = nn.DataParallel(model) else : print("GPU count: ", torch.cuda.device_count()) model.to(device) # Define loss function and optimizer loss_fn = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # training loop def train(dataloader, model, loss_fn, optimizer): size = len(dataloader.dataset) model.train() for batch, (X, y) in enumerate(dataloader): X, y = X.to(device), y.to(device) # Compute prediction error pred = model(X) loss = loss_fn(pred, y) # Backpropagation loss.backward() optimizer.step() optimizer.zero_grad() if batch % 100 == 0: loss, current = loss.item(), (batch + 1) * len(X) print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") # Train the DP model train(train_dataloader, model, loss_fn, optimizer)
使用 DP 的数据并行方式,会根据单机上 GPU 个数或者自己配置使用的 GPU 组,将输入数据自动分割,并分发到每一个 GPU 上进行模型训练。当每个 GPU 上的数据训练计算结束之后,DataParallel 会自动收集结果、合并结果,最终输出累加后的结果。所以在实际训练过程中,对于进行分发数据和聚合结果的 GPU 设备会有更高的使用率,相比于其它几个 GPU 可能会更容易出现问题,比如内存使用 OOM 问题等。
在一些要求不高的场景下可以直接使用 DP 数据并行模式,比如调试测试模型,不建议在正式生产环境中直接使用 DP 数据并行模式训练模式。
参考资源
- https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html
- https://pytorch.org/tutorials/beginner/former_torchies/parallelism_tutorial.html
- https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。