PyTorch的并行与分布式训练深度解析
PyTorch的并行与分布式训练深度解析
在深度学习任务中,模型规模不断增大、数据量越来越多,单张 GPU 难以满足计算和内存需求。PyTorch 提供了一整套并行和分布式训练的方法,既能在单机多 GPU 上加速训练,也能跨多机多 GPU 做大规模并行训练。本文从原理、代码示例、图解和实践细节出发,帮助你深入理解 PyTorch 的并行与分布式训练体系,并快速上手。
目录
- 并行 vs 分布式:基本概念
- 2.1
torch.nn.DataParallel
原理与示例 - 2.2
DataParallel
的性能瓶颈
- 2.1
分布式训练基本原理:
DistributedDataParallel (DDP)
- 3.1 进程与设备映射、通信后端
- 3.2 典型通信流程(梯度同步的 All-Reduce)
- 3.3 进程组初始化与环境变量
- 4.1 代码示例:最简单的 DDP Script
- 4.2 启动方式:
torch.distributed.launch
与torchrun
- 4.3 训练流程图解
- 5.1 集群环境准备(SSH 无密码登录、网络连通性)
- 5.2 环境变量与初始化(
MASTER_ADDR
、MASTER_PORT
、WORLD_SIZE
、RANK
) - 5.3 代码示例:跨主机 DDP 脚本
- 5.4 多机 DDP 流程图解
- 6.1 混合精度训练与梯度累积
- 6.2 模型切分(
torch.distributed.pipeline.sync.Pipe
) - 6.3 异步数据加载与
DistributedSampler
- 6.4 NCCL 参数调优与网络优化
- 7.1 代码结构一览
- 7.2 核心脚本详解
- 7.3 训练流程示意
- 常见问题与调试思路
- 总结
并行 vs 分布式基本概念
- 并行(Parallel):通常指在同一台机器上,使用多张 GPU(或多张卡)同时进行计算。PyTorch 中的
DataParallel
、DistributedDataParallel
(当world_size=1
)都能实现单机多卡并行。 - 分布式(Distributed):指跨多台机器(node),每台机器可能有多张 GPU,通过网络进行通信,实现大规模并行训练。PyTorch 中的
DistributedDataParallel
正是为了多机多卡场景设计。
- 数据并行(Data Parallelism):每个进程或 GPU 拥有一个完整的模型副本,将 batch 切分成若干子 batch,分别放在不同设备上计算 forward 和 backward,最后在所有设备间同步(通常是梯度的 All-Reduce),再更新各自的模型。PyTorch DDP 默认就是数据并行方式。
- 模型并行(Model Parallelism):将一个大模型切分到不同设备上执行,每个设备负责模型的一部分,数据在不同设备上沿网络前向或后向传播。这种方式更复杂,本文主要聚焦数据并行。
备注:简单地说,单机多 GPU 并行是并行;跨机多 GPU 同时训练就是分布式(当然还是数据并行,只不过通信跨网络)。
单机多 GPU 并行:DataParallel
与其局限
2.1 torch.nn.DataParallel
原理与示例
PyTorch 提供了 torch.nn.DataParallel
(DP)用于单机多卡并行。使用方式非常简单:
import torch
import torch.nn as nn
import torch.optim as optim
# 假设有 2 张 GPU:cuda:0、cuda:1
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 定义模型
class SimpleNet(nn.Module):
def __init__(self):
super(SimpleNet, self).__init__()
self.fc = nn.Linear(1000, 10)
def forward(self, x):
return self.fc(x)
# 实例化并包装 DataParallel
model = SimpleNet().to(device)
model = nn.DataParallel(model)
# 定义优化器、损失函数
optimizer = optim.SGD(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
# 训练循环示例
for data, target in dataloader: # 假设 dataloader 生成 [batch_size, 1000] 的输入
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
outputs = model(data) # DataParallel 自动将 data 切分到多卡
loss = criterion(outputs, target)
loss.backward() # 梯度会聚合到主设备(默认是 cuda:0)
optimizer.step()
执行流程图解(单机 2 张 GPU):
┌─────────────────────────────────────────────────────────┐
│ 主进程 (cuda:0) │
│ - 构建模型副本1 -> 放在 cuda:0 │
│ - 构建模型副本2 -> 放在 cuda:1 │
│ - dataloader 生成一个 batch [N, …] │
└─────────────────────────────────────────────────────────┘
│
│ DataParallel 负责将输入拆分为两份
▼
┌───────────────────────┐ ┌───────────────────────┐
│ 子进程 GPU0 (rank0) │ │ 子进程 GPU1 (rank1) │
│ 输入 slice0 │ │ 输入 slice1 │
│ forward -> loss0 │ │ forward -> loss1 │
│ backward (计算 grad0) │ │ backward (计算 grad1) │
└───────────────────────┘ └───────────────────────┘
│ │
│ 梯度复制到主 GPU │
└───────────┬────────────┘
▼
┌─────────────────────────────────┐
│ 主进程在 cuda:0 聚合所有 GPU 的梯度 │
│ optimizer.step() 更新权重到各卡 │
└─────────────────────────────────┘
- 优点:使用极其简单,无需手动管理进程;输入切分、梯度聚合由框架封装。
局限:
- 单进程多线程:DataParallel 在主进程中用多线程(其实是异步拷贝)驱动多个 GPU,存在 GIL(全局解释器锁)和 Python 进程内瓶颈。
- 通信瓶颈:梯度聚合通过主 GPU(cuda:0)做收集,形成通信热点;随着 GPU 数量增加,cuda:0 会成为性能瓶颈。
- 负载不均衡:如果 batch size 不能整除 GPU 数量,DataParallel 会自动将多余样本放到最后一个 GPU,可能导致部分 GPU 负载更重。
因此,虽然 DataParallel
简单易用,但性能上难以大规模扩展。PyTorch 官方推荐在单机多卡时使用 DistributedDataParallel
代替 DataParallel
。
2.2 DataParallel
的性能瓶颈
- 梯度集中(Bottleneck):所有 GPU 的梯度必须先传到主 GPU,主 GPU 聚合后再广播更新的参数,通信延迟和主 GPU 计算开销集中在一处。
- 线程调度开销:尽管 PyTorch 通过 C++ 异步拷贝和 Kernels 优化,但 Python GIL 限制使得多线程调度、数据拷贝容易引发等待。
- 少量 GPU 数目适用:当 GPU 数量较少(如 2\~4 块)时,
DataParallel
的性能损失不很明显,但当有 8 块及以上 GPU 时,就会严重拖慢训练速度。
分布式训练基本原理:DistributedDataParallel (DDP)
DistributedDataParallel
(简称 DDP)是 PyTorch 推荐的并行训练接口。不同于 DataParallel
,DDP 采用单进程单 GPU或单进程多 GPU(少见)模式,每个 GPU 都运行一个进程(进程中只使用一个 GPU),通过高效的 NCCL 或 Gloo 后端实现多 GPU 或多机间的梯度同步。
3.1 进程与设备映射、通信后端
- 进程与设备映射:DDP 通常为每张 GPU 启动一个进程,并在该进程中将
model.to(local_rank)
(local_rank
指定该进程绑定的 GPU 下标)。这种方式绕过了 GIL,实现真正的并行。 主机(node)与全局进程编号:
world_size
:全局进程总数 =num_nodes × gpus_per_node
。rank
:当前进程在全局中的编号,范围是[0, world_size-1]
。local_rank
:当前进程在本地机器(node)上的 GPU 下标,范围是[0, gpus_per_node-1]
。
通信后端(backend):
- NCCL(NVIDIA Collective Communications Library):高效的 GPU-GPU 通信后端,支持多 GPU、小消息和大消息的优化;一般用于 GPU 设备间。
- Gloo:支持 CPU 或 GPU,适用于小规模测试或没有 GPU NCCL 环境时。
- MPI:也可通过 MPI 后端,但这需要系统预装 MPI 实现,一般在超级计算集群中常见。
3.2 典型通信流程(梯度同步的 All-Reduce)
在 DDP 中,每个进程各自完成 forward 和 backward 计算——
- Forward:每个进程将本地子 batch 放到 GPU 上,进行前向计算得到 loss。
- Backward:在执行
loss.backward()
时,DDP 会在各个 GPU 计算得到梯度后,异步触发 All-Reduce 操作,将所有进程对应张量的梯度做求和(Sum),再自动除以world_size
或按需要均匀分发。 - 更新参数:所有进程会拥有相同的梯度,后续每个进程各自执行
optimizer.step()
,使得每张 GPU 的模型权重保持同步,无需显式广播。
All-Reduce 原理图示(以 4 个 GPU 为例):
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ GPU 0 │ │ GPU 1 │ │ GPU 2 │ │ GPU 3 │
│ grad0 │ │ grad1 │ │ grad2 │ │ grad3 │
└────┬──────┘ └────┬──────┘ └────┬──────┘ └────┬──────┘
│ │ │ │
│ a) Reduce-Scatter Reduce-Scatter │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ chunk0_0 │ │ chunk1_1 │ │ chunk2_2 │ │ chunk3_3 │
└───────────┘ └───────────┘ └───────────┘ └───────────┘
│ │ │ │
│ b) All-Gather All-Gather │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ sum_grad0 │ │ sum_grad1 │ │ sum_grad2 │ │ sum_grad3 │
└───────────┘ └───────────┘ └───────────┘ └───────────┘
- Reduce-Scatter:将所有 GPU 的梯度分成若干等长子块(chunk0, chunk1, chunk2, chunk3),每个 GPU 负责汇聚多卡中对应子块的和,放入本地。
- All-Gather:各 GPU 将自己拥有的子块广播给其他 GPU,最终每个 GPU 都能拼接到完整的
sum_grad
。
最后,每个 GPU 拥有的 sum_grad
都是所有进程梯度的求和结果;如果开启了 average
模式,就已经是平均梯度,直接用来更新参数。
3.3 进程组初始化与环境变量
初始化:在每个进程中,需要调用
torch.distributed.init_process_group(backend, init_method, world_size, rank)
,完成进程间的通信环境初始化。backend
:常用"nccl"
或"gloo"
。init_method
:指定进程组初始化方式,支持:- 环境变量方式(Env):最常见的做法,通过环境变量
MASTER_ADDR
(主节点 IP)、MASTER_PORT
(主节点端口)、WORLD_SIZE
、RANK
等自动初始化。 - 文件方式(File):在 NFS 目录下放一个 file://URI,适合单机测试或文件共享场景。
- TCP 方式(tcp\://):直接给出主节点地址,如
init_method='tcp://ip:port'
。
- 环境变量方式(Env):最常见的做法,通过环境变量
world_size
:总进程数。rank
:当前进程在总进程列表中的编号。
环境变量示例(假设 2 台机器,每台 4 GPU,总共 8 个进程):
主节点(rank 0 所在机器)环境:
export MASTER_ADDR=192.168.0.1 export MASTER_PORT=23456 export WORLD_SIZE=8 export RANK=0 # 对应第一个进程, 绑定本机 GPU Device 0 export LOCAL_RANK=0
同一机器上,接下来还要启动进程:
export RANK=1; export LOCAL_RANK=1 # 绑定 GPU Device 1 export RANK=2; export LOCAL_RANK=2 # 绑定 GPU Device 2 export RANK=3; export LOCAL_RANK=3 # 绑定 GPU Device 3
第二台机器(主节点地址相同,rank 从 4 到 7):
export MASTER_ADDR=192.168.0.1 export MASTER_PORT=23456 export WORLD_SIZE=8 export RANK=4; export LOCAL_RANK=0 # 本机 GPU0 export RANK=5; export LOCAL_RANK=1 # 本机 GPU1 export RANK=6; export LOCAL_RANK=2 # 本机 GPU2 export RANK=7; export LOCAL_RANK=3 # 本机 GPU3
在实际使用 torch.distributed.launch
(或 torchrun
)脚本时,PyTorch 会自动为你设置好这些环境变量,无需手动逐一赋值。
单机多 GPU 下使用 DDP
在单机多 GPU 场景下,我们一般用 torch.distributed.launch
或者新版的 torchrun
来一次性启动多个进程,每个进程对应一张 GPU。
4.1 代码示例:最简单的 DDP Script
下面给出一个最简版的单机多 GPU DDP 训练脚本 train_ddp.py
,以 MNIST 作为演示模型。
# train_ddp.py
import os
import argparse
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
def setup(rank, world_size):
"""
初始化进程组
"""
dist.init_process_group(
backend="nccl",
init_method="env://", # 根据环境变量初始化
world_size=world_size,
rank=rank
)
torch.cuda.set_device(rank) # 设置当前进程使用的 GPU
def cleanup():
dist.destroy_process_group()
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
self.fc = nn.Linear(32 * 28 * 28, 10)
def forward(self, x):
x = self.conv(x)
x = self.relu(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
def demo_ddp(rank, world_size, args):
print(f"Running DDP on rank {rank}.")
setup(rank, world_size)
# 构造模型并包装 DDP
model = SimpleCNN().cuda(rank)
ddp_model = DDP(model, device_ids=[rank])
# 定义优化器与损失函数
criterion = nn.CrossEntropyLoss().cuda(rank)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
# DataLoader: 使用 DistributedSampler
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, sampler=sampler)
# 训练循环
epochs = args.epochs
for epoch in range(epochs):
sampler.set_epoch(epoch) # 每个 epoch 需调用,保证打乱数据一致性
ddp_model.train()
epoch_loss = 0.0
for batch_idx, (data, target) in enumerate(dataloader):
data = data.cuda(rank, non_blocking=True)
target = target.cuda(rank, non_blocking=True)
optimizer.zero_grad()
output = ddp_model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
print(f"Rank {rank}, Epoch [{epoch}/{epochs}], Loss: {epoch_loss/len(dataloader):.4f}")
cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=3, help="number of total epochs to run")
args = parser.parse_args()
world_size = torch.cuda.device_count()
# 通过 torch.multiprocessing.spawn 启动多个进程
torch.multiprocessing.spawn(
demo_ddp,
args=(world_size, args),
nprocs=world_size,
join=True
)
if __name__ == "__main__":
main()
代码详解
setup(rank, world_size)
- 调用
dist.init_process_group(backend="nccl", init_method="env://", world_size, rank)
根据环境变量初始化通信组。 - 使用
torch.cuda.set_device(rank)
将当前进程绑定到对应编号的 GPU。
- 调用
模型与 DDP 封装
model = SimpleCNN().cuda(rank)
将模型加载至本地 GPUrank
。ddp_model = DDP(model, device_ids=[rank])
用 DDP 包装模型,device_ids
表明该进程使用哪个 GPU。
数据划分:
DistributedSampler
DistributedSampler
会根据rank
和world_size
划分数据集,确保各进程获取互斥的子集。- 在每个 epoch 调用
sampler.set_epoch(epoch)
以改变随机种子,保证多进程 shuffle 同步且不完全相同。
训练循环
- 每个进程的训练逻辑相同,只不过处理不同子集数据;
loss.backward()
时,DDP 内部会自动触发跨进程的 All-Reduce,同步每层参数在所有进程上的梯度。- 同步完成后,每个进程都可以调用
optimizer.step()
独立更新本地模型。由于梯度一致,更新后模型权重会保持同步。
启动方式
torch.multiprocessing.spawn
:在本脚本通过world_size = torch.cuda.device_count()
自动获取卡数,然后 spawn 多个进程;这种方式不需要使用torch.distributed.launch
。- 也可直接在命令行使用
torchrun
,并将ddp_model = DDP(...)
放在脚本中,根据环境变量自动分配 GPU。
4.2 启动方式:torch.distributed.launch
与 torchrun
方式一:使用 torchrun
(PyTorch 1.9+ 推荐)
# 假设单机有 4 张 GPU
# torchrun 会自动设置 WORLD_SIZE=4, RANK=0~3, LOCAL_RANK=0~3
torchrun --nnodes=1 --nproc_per_node=4 train_ddp.py --epochs 5
--nnodes=1
:单机。--nproc_per_node=4
:开启 4 个进程,每个进程对应一张 GPU。PyTorch 会为每个进程设置环境变量:
- 进程0:
RANK=0
,LOCAL_RANK=0
,WORLD_SIZE=4
- 进程1:
RANK=1
,LOCAL_RANK=1
,WORLD_SIZE=4
- …
- 进程0:
方式二:使用 torch.distributed.launch
(旧版)
python -m torch.distributed.launch --nproc_per_node=4 train_ddp.py --epochs 5
- 功能与
torchrun
基本相同,但launch
已被标记为即将弃用,新的项目应尽量转为torchrun
。
4.3 训练流程图解
┌──────────────────────────────────────────────────────────────────┐
│ 单机多 GPU DDP │
│ │
│ torchrun 启动 4 个进程 (rank = 0,1,2,3) │
│ 每个进程绑定到不同 GPU (cuda:0,1,2,3) │
└──────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ 进程0 │ │ 进程1 │ │ 进程2 │ │ 进程3 │
│ Rank=0 │ │ Rank=1 │ │ Rank=2 │ │ Rank=3 │
│ CUDA:0 │ │ CUDA:1 │ │ CUDA:2 │ │ CUDA:3 │
└──────┬─────┘ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘
│ │ │ │
│ 同一Epoch sampler.set_epoch() 同步数据划分 │
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────┐
│ 每个进程从 DistributedSampler 获得 子Batch │
│ 例如: BatchSize=64, world_size=4, 每进程 batch=16 │
└──────────────────────────────────────────────────┘
│ │ │ │
│ forward 计算每个子 Batch 的输出 │
│ │ │ │
▼ ▼ ▼ ▼
┌────────────────────────────────────────────────────────────────┐
│ 所有进程 各自 执行 loss.backward() │
│ grad0 grad1 grad2 grad3 先各自计算本地梯度 │
└────────────────────────────────────────────────────────────────┘
│ │ │ │
│ DDP 触发 NCCL All-Reduce 梯度同步 │
│ │ │ │
▼ ▼ ▼ ▼
┌────────────────────────────────────────────────────────────────┐
│ 每个进程 获得同步后的 “sum_grad” 或 “avg_grad” │
│ 然后 optimizer.step() 各自 更新 本地 模型参数 │
└────────────────────────────────────────────────────────────────┘
│ │ │ │
└─── 同时继续下一个 mini-batch │
- 每个进程独立负责自己 GPU 上的计算,计算完毕后异步进行梯度同步。
- 一旦所有 GPU 梯度同步完成,才能执行参数更新;否则 DDP 会在
backward()
过程中阻塞。
多机多 GPU 下使用 DDP
当需要跨多台机器训练时,我们需要保证各机器间的网络连通性,并正确设置环境变量或使用启动脚本。
5.1 集群环境准备(SSH 无密码登录、网络连通性)
SSH 无密码登录
- 常见做法是在各节点间配置 SSH 密钥免密登录,方便分发任务脚本、日志收集和故障排查。
网络连通性
- 确保所有机器可以相互 ping 通,并且
MASTER_ADDR
(主节点 IP)与MASTER_PORT
(开放端口)可访问。 - NCCL 环境下对 RDMA/InfiniBand 环境有特殊优化,但最基本的是每台机的端口可达。
- 确保所有机器可以相互 ping 通,并且
5.2 环境变量与初始化
假设有 2 台机器,每台机器 4 张 GPU,要运行一个 8 卡分布式训练任务。我们可以在每台机器上分别执行如下命令,或在作业调度系统中配置。
主节点(机器 A,IP=192.168.0.1)
# 主节点启动进程 0~3
export MASTER_ADDR=192.168.0.1
export MASTER_PORT=23456
export WORLD_SIZE=8
# GPU 0
export RANK=0
export LOCAL_RANK=0
# 启动第一个进程
python train_ddp_multi_machine.py --epochs 5 &
# GPU 1
export RANK=1
export LOCAL_RANK=1
python train_ddp_multi_machine.py --epochs 5 &
# GPU 2
export RANK=2
export LOCAL_RANK=2
python train_ddp_multi_machine.py --epochs 5 &
# GPU 3
export RANK=3
export LOCAL_RANK=3
python train_ddp_multi_machine.py --epochs 5 &
从节点(机器 B,IP=192.168.0.2)
# 从节点启动进程 4~7
export MASTER_ADDR=192.168.0.1 # 指向主节点
export MASTER_PORT=23456
export WORLD_SIZE=8
# GPU 0(在该节点上 rank=4)
export RANK=4
export LOCAL_RANK=0
python train_ddp_multi_machine.py --epochs 5 &
# GPU 1(在该节点上 rank=5)
export RANK=5
export LOCAL_RANK=1
python train_ddp_multi_machine.py --epochs 5 &
# GPU 2(在该节点上 rank=6)
export RANK=6
export LOCAL_RANK=2
python train_ddp_multi_machine.py --epochs 5 &
# GPU 3(在该节点上 rank=7)
export RANK=7
export LOCAL_RANK=3
python train_ddp_multi_machine.py --epochs 5 &
Tip:在实际集群中,可以编写一个 bash
脚本或使用作业调度系统(如 SLURM、Kubernetes)一次性分发多个进程、配置好环境变量。
5.3 代码示例:跨主机 DDP 脚本
train_ddp_multi_machine.py
与单机脚本大同小异,只需在 init_process_group
中保持 init_method="env://"
即可。示例略去了网络通信细节:
# train_ddp_multi_machine.py
import os
import argparse
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
def setup(rank, world_size):
dist.init_process_group(
backend="nccl",
init_method="env://", # 使用环境变量 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE
world_size=world_size,
rank=rank
)
torch.cuda.set_device(rank % torch.cuda.device_count())
# rank % gpu_count,用于在多机多卡时自动映射对应 GPU
def cleanup():
dist.destroy_process_group()
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
self.fc = nn.Linear(32 * 28 * 28, 10)
def forward(self, x):
x = self.conv(x)
x = self.relu(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
def demo_ddp(rank, world_size, args):
print(f"Rank {rank} setting up, world_size {world_size}.")
setup(rank, world_size)
model = SimpleCNN().cuda(rank % torch.cuda.device_count())
ddp_model = DDP(model, device_ids=[rank % torch.cuda.device_count()])
criterion = nn.CrossEntropyLoss().cuda(rank % torch.cuda.device_count())
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, sampler=sampler)
for epoch in range(args.epochs):
sampler.set_epoch(epoch)
ddp_model.train()
epoch_loss = 0.0
for batch_idx, (data, target) in enumerate(dataloader):
data = data.cuda(rank % torch.cuda.device_count(), non_blocking=True)
target = target.cuda(rank % torch.cuda.device_count(), non_blocking=True)
optimizer.zero_grad()
output = ddp_model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
print(f"Rank {rank}, Epoch [{epoch}], Loss: {epoch_loss/len(dataloader):.4f}")
cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=3, help="number of total epochs to run")
args = parser.parse_args()
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
demo_ddp(rank, world_size, args)
if __name__ == "__main__":
main()
代码要点
rank % torch.cuda.device_count()
- 当多机时,
rank
的值会从 0 到world_size-1
。用rank % gpu_count
,可保证同一台机器上的不同进程正确映射到本机的 GPU。
- 当多机时,
init_method="env://"
- 让 PyTorch 自动从
MASTER_ADDR
、MASTER_PORT
、RANK
、WORLD_SIZE
中读取初始化信息,无需手动传递。
- 让 PyTorch 自动从
DataLoader 与
DistributedSampler
- 使用同样的方式划分数据,各进程只读取独立子集。
5.4 多机 DDP 流程图解
┌────────────────────────────────────────────────────────────────────────────────┐
│ 多机多 GPU DDP │
├────────────────────────────────────────────────────────────────────────────────┤
│ Machine A (IP=192.168.0.1) │ Machine B (IP=192.168.0.2) │
│ │ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ ┌────────────┐ │
│ │ Rank=0 GPU0│ │ Rank=1 GPU1│ │ Rank=2 GPU2│ │ Rank=3 GPU3│ │ │ Rank=4 GPU0│ │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │ └──────┬─────┘ │
│ │ │ │ │ │ │ │
│ │ DDP Init │ │ │ │ │ │
│ │ init_method │ │ │ │ │ │
│ │ env:// │ │ │ │ │ │
│ │ │ │ │ │ │ │
│ ┌───▼─────────┐ ┌─▼─────────┐ ┌─▼─────────┐ ┌─▼─────────┐ │ ┌─▼─────────┐ │
│ │ DataLoad0 │ │ DataLoad1 │ │ DataLoad2 │ │ DataLoad3 │ │ │ DataLoad4 │ │
│ │ (子Batch0) │ │ (子Batch1) │ │ (子Batch2) │ │ (子Batch3) │ │ │ (子Batch4) │ │
│ └───┬─────────┘ └─┬─────────┘ └─┬─────────┘ └─┬─────────┘ │ └─┬─────────┘ │
│ │ │ │ │ │ │ │
│ forward│ forward│ forward│ forward│ │ forward│ │
│ ▼ ▼ ▼ ▼ ▼ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ 梯度计算 │ │
│ │ grad0, grad1, grad2, grad3 (A 机) | grad4, grad5, grad6, grad7 (B 机) │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │ │ │ │ │ │ │
│ │──────────────┼──────────────┼──────────────┼──────┼─────────┼────────┤
│ │ NCCL All-Reduce Across 8 GPUs for gradient sync │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ 每个 GPU 获得同步后梯度 sum_grad │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │ │ │ │ │ │ │
│ optimizer.step() 执行各自的参数更新 │
│ │ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ 下一轮 Batch(epoch 或者 step) │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────────────────┘
- 两台机器共 8 个进程,启动后每个进程在本机获取子 batch,forward、backward 计算各自梯度。
- NCCL 自动完成跨机器、跨 GPU 的 All-Reduce 操作,最终每个 GPU 拿到同步后的梯度,进而每个进程更新本地模型。
- 通信由 NCCL 负责,底层会在网络和 PCIe 总线上高效调度数据传输。
高阶技巧与优化
6.1 混合精度训练与梯度累积
混合精度训练(Apex AMP / PyTorch Native AMP)
- 使用半精度(FP16)加速训练并节省显存,同时混合保留关键层的全精度(FP32)以保证数值稳定性。
PyTorch Native AMP 示例(在 DDP 上同样适用):
scaler = torch.cuda.amp.GradScaler() for data, target in dataloader: optimizer.zero_grad() with torch.cuda.amp.autocast(): output = ddp_model(data) loss = criterion(output, target) scaler.scale(loss).backward() # 梯度缩放 scaler.step(optimizer) scaler.update()
- DDP 会正确处理混合精度场景下的梯度同步。
梯度累积(Gradient Accumulation)
- 当显存有限时,想要模拟更大的 batch size,可在小 batch 上多步累积梯度,然后再更新一次参数。
- 关键点:在累积期间不调用
optimizer.step()
,只在 N 步后调用;但要确保 DDP 在 backward 时依然执行 All-Reduce。 示例:
accumulation_steps = 4 # 每 4 个小批次累积梯度再更新 for i, (data, target) in enumerate(dataloader): data, target = data.cuda(rank), target.cuda(rank) with torch.cuda.amp.autocast(): output = ddp_model(data) loss = criterion(output, target) / accumulation_steps scaler.scale(loss).backward() if (i + 1) % accumulation_steps == 0: scaler.step(optimizer) scaler.update() optimizer.zero_grad()
- 注意:即使在某些迭代不调用
optimizer.step()
,DDP 的梯度同步(All-Reduce)仍会执行在每次loss.backward()
时,这样确保各进程梯度保持一致。
6.2 模型切分:torch.distributed.pipeline.sync.Pipe
当模型非常大(如上百亿参数)时,单张 GPU 放不下一个完整模型,需将模型拆分到多张 GPU 上做流水线并行(Pipeline Parallelism)。PyTorch 自 1.8 起提供了 torch.distributed.pipeline.sync.Pipe
接口:
- 思路:将模型分割成若干子模块(分段),每个子模块放到不同 GPU 上;然后数据分为若干 micro-batch,经过流水线传递,保证 GPU 间并行度。
示例:
import torch import torch.nn as nn import torch.distributed as dist from torch.distributed.pipeline.sync import Pipe # 假设 2 张 GPU device0 = torch.device("cuda:0") device1 = torch.device("cuda:1") # 定义模型分段 seq1 = nn.Sequential( nn.Conv2d(3, 64, 3, padding=1), nn.ReLU(), # …更多层 ).to(device0) seq2 = nn.Sequential( # 剩余层 nn.Linear(1024, 10) ).to(device1) # 使用 Pipe 封装 model = Pipe(torch.nn.Sequential(seq1, seq2), chunks=4) # chunks 参数指定 micro-batch 数量,用于流水线分割 # Forward 示例 input = torch.randn(32, 3, 224, 224).to(device0) output = model(input)
- 注意:流水线并行与 DDP 并行可以结合,称为混合并行,用于超大模型训练。
6.3 异步数据加载与 DistributedSampler
- 异步数据加载:在 DDP 中,使用
num_workers>0
的DataLoader
可以在 CPU 侧并行加载数据。 pin_memory=True
:将数据预先锁页在内存,拷贝到 GPU 时更高效。DistributedSampler
:- 保证每个进程只使用其对应的那一份数据;
- 在每个 epoch 开始时,调用
sampler.set_epoch(epoch)
以保证不同进程之间的 Shuffle 结果一致; 示例:
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank) dataloader = torch.utils.data.DataLoader( dataset, batch_size=64, sampler=sampler, num_workers=4, pin_memory=True )
- 注意:不要同时对
shuffle=True
和DistributedSampler
传入shuffle=True
,应该使用shuffle=False
。DistributedSampler
会负责乱序。
6.4 NCCL 参数调优与网络优化
NCCL_DEBUG=INFO
或NCCL_DEBUG=TRACE
:开启 NCCL 调试信息,便于排查通信问题。NCCL_SOCKET_IFNAME
:指定用于通信的网卡接口,如eth0
,ens3
,避免 NCCL 默认使用不通的网卡。export NCCL_SOCKET_IFNAME=eth0
NCCL_IB_DISABLE
/NCCL_P2P_LEVEL
:如果不使用 InfiniBand,可禁用 IB;在某些网络环境下,需要调节点对点 (P2P) 级别。export NCCL_IB_DISABLE=1
- 网络带宽与延迟:高带宽、低延迟的网络(如 100Gb/s)对多机训练性能提升非常明显。如果带宽不够,会成为瓶颈。
- Avoid Over-Subscription:避免一个物理 GPU 上跑多个进程(除非特意设置);应确保
world_size <= total_gpu_count
,否则不同进程会争抢同一张卡。
完整示例:ResNet-50 多机多 GPU 训练
下面以 ImageNet 上的 ResNet-50 为例,展示一套完整的多机多 GPU DDP训练脚本结构,帮助你掌握真实项目中的组织方式。
7.1 代码结构一览
resnet50_ddp/
├── train.py # 主脚本,包含 DDP 初始化、训练、验证逻辑
├── model.py # ResNet-50 模型定义或引用 torchvision.models
├── utils.py # 工具函数:MetricLogger、accuracy、checkpoint 保存等
├── dataset.py # ImageNet 数据集封装与 DataLoader 创建
├── config.yaml # 超参数、数据路径、分布式相关配置
└── launch.sh # 启动脚本,用于多机多 GPU 环境变量设置与启动
7.2 核心脚本详解
7.2.1 config.yaml
示例
# config.yaml
data:
train_dir: /path/to/imagenet/train
val_dir: /path/to/imagenet/val
batch_size: 256
num_workers: 8
model:
pretrained: false
num_classes: 1000
optimizer:
lr: 0.1
momentum: 0.9
weight_decay: 1e-4
training:
epochs: 90
print_freq: 100
distributed:
backend: nccl
7.2.2 model.py
示例
# model.py
import torch.nn as nn
import torchvision.models as models
def create_model(num_classes=1000, pretrained=False):
model = models.resnet50(pretrained=pretrained)
# 替换最后的全连接层
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, num_classes)
return model
7.2.3 dataset.py
示例
# dataset.py
import torch
from torchvision import datasets, transforms
def build_dataloader(data_dir, batch_size, num_workers, is_train, world_size, rank):
if is_train:
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])
dataset = datasets.ImageFolder(root=data_dir, transform=transform)
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, sampler=sampler,
num_workers=num_workers, pin_memory=True
)
else:
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])
dataset = datasets.ImageFolder(root=data_dir, transform=transform)
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=False)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, sampler=sampler,
num_workers=num_workers, pin_memory=True
)
return dataloader
7.2.4 utils.py
常用工具
# utils.py
import torch
import time
class MetricLogger(object):
def __init__(self):
self.meters = {}
def update(self, **kwargs):
for k, v in kwargs.items():
if k not in self.meters:
self.meters[k] = SmoothedValue()
self.meters[k].update(v)
def __str__(self):
return " ".join(f"{k}: {str(v)}" for k, v in self.meters.items())
class SmoothedValue(object):
def __init__(self, window_size=20):
self.window_size = window_size
self.deque = []
self.total = 0.0
self.count = 0
def update(self, val):
self.deque.append(val)
self.total += val
self.count += 1
if len(self.deque) > self.window_size:
removed = self.deque.pop(0)
self.total -= removed
self.count -= 1
def __str__(self):
avg = self.total / self.count if self.count != 0 else 0
return f"{avg:.4f}"
def accuracy(output, target, topk=(1,)):
""" 计算 top-k 准确率 """
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res # 返回 list: [top1_acc, top5_acc,...]
7.2.5 train.py
核心示例
# train.py
import os
import yaml
import argparse
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.optim as optim
import torch.nn as nn
from model import create_model
from dataset import build_dataloader
from utils import MetricLogger, accuracy
def setup(rank, world_size, args):
dist.init_process_group(
backend=args["distributed"]["backend"],
init_method="env://",
world_size=world_size,
rank=rank
)
torch.cuda.set_device(rank % torch.cuda.device_count())
def cleanup():
dist.destroy_process_group()
def train_one_epoch(epoch, model, criterion, optimizer, dataloader, rank, world_size, args):
model.train()
sampler = dataloader.sampler
sampler.set_epoch(epoch) # 同步 shuffle
metrics = MetricLogger()
for batch_idx, (images, labels) in enumerate(dataloader):
images = images.cuda(rank % torch.cuda.device_count(), non_blocking=True)
labels = labels.cuda(rank % torch.cuda.device_count(), non_blocking=True)
output = model(images)
loss = criterion(output, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
top1, top5 = accuracy(output, labels, topk=(1,5))
metrics.update(loss=loss.item(), top1=top1.item(), top5=top5.item())
if batch_idx % args["training"]["print_freq"] == 0 and rank == 0:
print(f"Epoch [{epoch}] Batch [{batch_idx}/{len(dataloader)}]: {metrics}")
def evaluate(model, criterion, dataloader, rank, args):
model.eval()
metrics = MetricLogger()
with torch.no_grad():
for images, labels in dataloader:
images = images.cuda(rank % torch.cuda.device_count(), non_blocking=True)
labels = labels.cuda(rank % torch.cuda.device_count(), non_blocking=True)
output = model(images)
loss = criterion(output, labels)
top1, top5 = accuracy(output, labels, topk=(1,5))
metrics.update(loss=loss.item(), top1=top1.item(), top5=top5.item())
if rank == 0:
print(f"Validation: {metrics}")
def main():
parser = argparse.ArgumentParser(description="PyTorch DDP ResNet50 Training")
parser.add_argument("--config", default="config.yaml", help="path to config file")
args = parser.parse_args()
with open(args.config, "r") as f:
config = yaml.safe_load(f)
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
setup(rank, world_size, config)
# 构建模型
model = create_model(num_classes=config["model"]["num_classes"], pretrained=config["model"]["pretrained"])
model = model.cuda(rank % torch.cuda.device_count())
ddp_model = DDP(model, device_ids=[rank % torch.cuda.device_count()])
criterion = nn.CrossEntropyLoss().cuda(rank % torch.cuda.device_count())
optimizer = optim.SGD(ddp_model.parameters(), lr=config["optimizer"]["lr"],
momentum=config["optimizer"]["momentum"],
weight_decay=config["optimizer"]["weight_decay"])
# 构建 DataLoader
train_loader = build_dataloader(
config["data"]["train_dir"],
config["data"]["batch_size"],
config["data"]["num_workers"],
is_train=True,
world_size=world_size,
rank=rank
)
val_loader = build_dataloader(
config["data"]["val_dir"],
config["data"]["batch_size"],
config["data"]["num_workers"],
is_train=False,
world_size=world_size,
rank=rank
)
# 训练与验证流程
for epoch in range(config["training"]["epochs"]):
if rank == 0:
print(f"Starting epoch {epoch}")
train_one_epoch(epoch, ddp_model, criterion, optimizer, train_loader, rank, world_size, config)
if rank == 0:
evaluate(ddp_model, criterion, val_loader, rank, config)
cleanup()
if __name__ == "__main__":
main()
解释要点
setup
与cleanup
- 仍是基于环境变量自动初始化和销毁进程组。
模型与 DDP 包装
- 通过
model.cuda(...)
将模型搬到本地 GPU,再用DDP(model, device_ids=[...])
包装。
- 通过
学习率、优化器
- 常用的
SGD
,学习率可在单机训练基础上除以world_size
(即线性缩放法),如此 batch size 变大仍能保持稳定。
- 常用的
DataLoader
- 复用了
build_dataloader
函数,DistributedSampler
做数据切分。 pin_memory=True
、num_workers
可加速数据预处理与拷贝。
- 复用了
打印日志
- 只让
rank==0
的进程负责打印主进程信息,避免日志冗余。
- 只让
验证
- 在每个 epoch 后让
rank==0
进程做验证并打印;当然也可以让所有进程并行做验证,但通常只需要一个进程做验证节省资源。
- 在每个 epoch 后让
7.3 训练流程示意
┌───────────────────────────────────────────────────────────────────────────┐
│ 2台机器 × 4 GPU 共 8 卡 │
├───────────────────────────────────────────────────────────────────────────┤
│ Machine A (192.168.0.1) │ Machine B (192.168.0.2) │
│ RANK=0 GPU0 ─ train.py │ RANK=4 GPU0 ─ train.py │
│ RANK=1 GPU1 ─ train.py │ RANK=5 GPU1 ─ train.py │
│ RANK=2 GPU2 ─ train.py │ RANK=6 GPU2 ─ train.py │
│ RANK=3 GPU3 ─ train.py │ RANK=7 GPU3 ─ train.py │
└───────────────────────────────────────────────────────────────────────────┘
│ │
│ DDP init -> 建立全局进程组 │
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Train Loader 0 │ │ Train Loader 4 │
│ (Rank0 数据子集) │ │ (Rank4 数据子集) │
└─────────────────┘ └─────────────────┘
│ │
│ ... │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Train Loader 3 │ │ Train Loader 7 │
│ (Rank3 数据子集) │ │ (Rank7 数据子集) │
└─────────────────┘ └─────────────────┘
│ │
│ 每张 GPU 独立 forward/backward │
│ │
▼ ▼
┌───────────────────────────────────────────────────────────────────────────┐
│ NCCL All-Reduce │
│ 所有 8 张 GPU 跨网络同步梯度 Sum / 平均 │
└───────────────────────────────────────────────────────────────────────────┘
│ │
│ 每张 GPU independently optimizer.step() 更新本地权重 │
│ │
▼ ▼
... ...
- 网络同步:所有 GPU 包括跨节点 GPU 都参与 NCCL 通信,实现高效梯度同步。
- 同步时机:在每次
loss.backward()
时 DDP 会等待所有 GPU 完成该次 backward,才进行梯度同步(All-Reduce),保证更新一致性。
常见问题与调试思路
进程卡死/死锁
- DDP 在
backward()
过程中会等待所有 GPU 梯度同步,如果某个进程因为数据加载或异常跳过了 backward,就会导致 All-Reduce 等待超时或永久阻塞。 - 方案:检查
DistributedSampler
是否正确设置,确认每个进程都有相同的 Iteration 次数;若出现异常导致提前跳出训练循环,也会卡住其他进程。
- DDP 在
OOM(Out of Memory)
- 每个进程都使用该进程绑定的那张 GPU,因此要确保
batch_size
/world_size
合理划分。 batch_size
应当与卡数成比例,如原来单卡 batch=256,若 8 卡并行,单卡可维持 batch=256 或者按线性缩放总 batch=2048 分配到每卡 256。
- 每个进程都使用该进程绑定的那张 GPU,因此要确保
梯度不一致/训练数值不对
- 可能由于未启用
torch.backends.cudnn.benchmark=False
或cudnn.deterministic=True
导致不同进程数据顺序不一致;也有可能是忘记在每个 epoch 调用sampler.set_epoch()
,导致 shuffle 不一致。 - 方案:固定随机种子
torch.manual_seed(seed)
并在sampler.set_epoch(epoch)
时使用相同的 seed。
- 可能由于未启用
NCCL 报错
- 常见错误:
NCCL timeout
、peer to peer access unable
、All 8 processes did not hit barrier
。 方案:
- 检查网络连通性,包括
MASTER_ADDR
、MASTER_PORT
、网卡是否正确; - 设置
NCCL_SOCKET_IFNAME
,确保 NCCL 使用可用网卡; - 检查 NCCL 版本与 GPU 驱动兼容性;
- 在调试时尝试使用
backend="gloo"
,判断是否 NCCL 配置问题。
- 检查网络连通性,包括
- 常见错误:
日志过多
- 进程越多,日志会越多。可在代码中控制
if rank == 0:
才打印。或者使用 Python 的logging
来记录并区分 rank。
- 进程越多,日志会越多。可在代码中控制
单机测试多进程
- 当本地没有多张 GPU,但想测试 DDP 逻辑,可使用
init_method="tcp://127.0.0.1:port"
并用world_size=2
,手动设置CUDA_VISIBLE_DEVICES=0,1
或使用gloo
后端在 CPU 上模拟。
- 当本地没有多张 GPU,但想测试 DDP 逻辑,可使用
总结
本文从并行与分布式的基本概念出发,深入讲解了 PyTorch 中常用的单机多卡并行(DataParallel
)与多机多卡分布式训练(DistributedDataParallel
)的原理和使用方法。重点内容包括:
单机多 GPU
DataParallel
:易用但性能瓶颈;- 推荐使用 DDP 来替代。
分布式训练原理
- All-Reduce 梯度同步,保证每个 GPU 都能拿到一致的梯度;
- 进程组初始化通过环境变量
MASTER_ADDR
、MASTER_PORT
、WORLD_SIZE
、RANK
完成; - NCCL 后端在多机多卡场景下性能优异。
DDP 使用示例
- 单机多卡:
torch.multiprocessing.spawn
或torchrun
启动多进程,并在代码中调用init_process_group
初始化; - 多机多卡:要保证网络连通、SSH 免密登录,并正确设置环境变量或使用脚本分发。
- 单机多卡:
高阶技巧
- 混合精度训练(AMP)加速与省显存;
- 梯度累积可实现超大 batch;
- 模型切分(流水线并行)适用于超大模型;
- NCCL 参数调优与网络优化可提升跨机训练效率。
只要掌握 DDP 的关键步骤,就能在多 GPU 或多机环境中高效地扩展深度学习任务。实践中,务必重视数据划分、通信后端配置和调试策略。希望本文的详细示例与图解能帮助你在 PyTorch 中深入理解并行与分布式训练,并应用到实际项目中,快速提升训练性能与效率。
评论已关闭