正文
# modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
import
torch
import
torch.distributed
as
dist
import
torch.nn
as
nn
from
torch.nn.parallel
import
DistributedDataParallel
as
DDP
from
torch.profiler
import
profile
import
torch.optim
as
optim
SIZE =
4000
class
ToyModel
(nn.Module)
:
def
__init__
(self)
:
super(ToyModel, self).__init__()
self.net1 = nn.Linear(SIZE, SIZE)
self.relu = nn.ReLU()
self.net2 = nn.Linear(SIZE, SIZE)
self.net3 = nn.Linear(SIZE, SIZE)
def
forward
(self, x)
:
return
self.net3(self.relu(self.net2(self.relu(self.net1(x)))))
def
demo_basic
()
:
dist.init_process_group(
"nccl"
)
rank = dist.get_rank()
print(
f"Start running basic DDP example on rank
{rank}
."
)
model = ToyModel().to(rank)
ddp_model = DDP(model, bucket_cap_mb=
25
, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=
0.001
)
with
profile(
record_shapes=
True
,
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
)
as
prof:
for
i
in
range(
10
):
optimizer.zero_grad()
outputs = ddp_model(torch.randn(
1000
, SIZE, device=rank))
labels = torch.randn(
1000
, SIZE, device=rank)
loss_fn(outputs, labels).backward()
optimizer.step()
if
rank ==
0
:
prof.export_chrome_trace(
"trace_ddp_example.json"
)
if
__name__ ==
"__main__"
:
demo_basic()
# torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=localhost:29400 ddp_example.py
作者分析了几分钟这个代码中一个iter的pytorch profiler结果,我们可以看到前向Pass,反向Pass,优化器更新参数,以及AllReduce的通信时间以及部分AllReduce被重叠到了反向计算中。这就引入到了下一张slides。
这里作者讲了一下DDP里面的AllReduce是怎么和Backward Pass重叠的,这个建议阅读这篇博客:https://zhuanlan.zhihu.com/p/485208899 ,从这张Slides的PyTorch Profiler图我们也可以发现一些其它信息,例如在同一个Stream上的kernel是顺序执行,所以为了重叠计算和通信这里使用了两个Stream。由于网络最开始的几个层必须等待梯度计算完毕才能开始AllReduce,所以存在无法重叠的层。
这张Slides提了一下yTorch DDP的内部机制,包括:
-
-
使用 autograd hooks 在构建时注册,用于触发梯度同步
-
Reducer 组件会异步执行 allreduce 操作来计算所有进程间的梯度平均值
-
计算完成后,平均后的梯度会被写入所有参数的 param.grad 字段
-
在反向传播完成后,不同 DDP 进程中相同参数的梯度值应该是一致的
-
-
-
NCCL API 的调用是在 PyTorch 的 ProcessGroupNCCL.cpp 文件中通过 Reducer 完成的
这张Slides开始介绍NCCL库中的nccl AllReduce API函数。该函数用于对长度为count的数据数组进行规约(reduce)操作,使用指定的op操作符进行计算,并将相同的结果复制到每个recvbuff中。当sendbuff和recvbuff指向相同位置时,会执行原地操作。这是一个在分布式深度学习中常用的集合通信操作,用于在多个GPU之间同步和聚合数据。
这张Slides介绍了NCCL通信器对象的两种使用场景:一种是每个CPU进程对应一个GPU的情况,此时root进程会生成唯一ID并广播给所有进程,所有进程用相同的ID和唯一的rank初始化通信器例如MPI;另一种是单个CPU进程管理多个GPU的情况,这时不需要广播ID,而是通过循环来初始化每个rank,并可以使用封装好的ncclCommInitAll函数来简化这个过程。Slides右侧的代码示例展示了这些初始化操作的具体实现方式。
这张Slides展示了错误处理宏定义
#define CUDACHECK(cmd) {
cudaError_t err = cmd;
if (err != cudaSuccess) {
printf("Failed: Cuda error %s:%d\n",
__FILE__,__LINE__,cudaGetErrorString(err));
exit(EXIT_FAILURE);
}
}
#define NCCLCHECK(cmd) {
ncclResult_t res = cmd;
if (res != ncclSuccess) {
printf("Failed: NCCL error %s:%d\n",
__FILE__,__LINE__,ncclGetErrorString(res));
exit(EXIT_FAILURE);
}
}
这部分定义了两个错误处理宏:
-
CUDACHECK
: 用于检查CUDA API调用的错误