PyTorch分布式训练详解教程scatter,gatherisend,irecvall_。。。
PyTorch分布式训练详解教程 scatter, gather & isend, irecv &
all_reduce & DDP
本⽂将从零⼊⼿,简单介绍如何使⽤PyTorch中的多种⽅法进⾏分布式训练。
具体⽽⾔,我们将使⽤四种⽅法,分别是: (1)scatter, gatter; (2)isend, irecv; (3)all_reduce; (4)DataDistributedParallel (DDP).
其简单原理是将数据集分区(partition data),之后分别发送到不同的节点进⾏训练,再将所获得的数据,例如梯度,发送到同⼀个节点进⾏运算如相加求和,再重新将参数分发到不同的结点。
本⽂将以VGG11模型和Cifar10数据集为例,具体介绍如何使⽤这四种⽅法进⾏分布式训练。
本⽂的实验环境为4节点Ubuntu18环境,分别为node0, node1, node2, node3,其中node0作为master节点,负责发送数据到其他节点,收集其他节点的数据,以及计算。请注意,本⽂中master节点特指node0,slave节点指node1, node2, node3(即便它们之间并⾮master和slave的关系)。Python环境为Anaconda下3.8版本,Pytorch 1.4版本。
本⽂不使⽤GPU,⽽是直接⽤CPU进⾏训练。但本⽂的代码只需要修改device参数即可移植到GPU上。master ip为10.10.1.1, port为29501。不同节点之间通过该端⼝进⾏通信。
请注意,为了使得每次跑得结果类似,本⽂中设置了PyTorch和Numpy的随机数,从⽽使得每次的结果相同,⽅便⽐较。
单机训练VGG11模型
⾸先,我们先来在单⼀节点上训练VGG模型以作为对⽐,数据集为Cifar10。代码在此:
接下来,我们主要关注train_model函数的写法,这个函数⽤于训练模型。通过更改这个函数的内容,我们可以实现分布式模型训练。
训练包括三步,根据输⼊数据得出输出数据,将输出与真值对⽐以计算loss,根据loss更新权值。请注意,在单机上batch_size=256。
for batch_idx, (data, target) in enumerate(train_loader):
# 获取数据
data, target = (device), (device)
<_grad()
output = model(data)
# 计算loss
train_loss = criterion(output, target)
# 更新权值
train_loss.backward()
optimizer.step()
多节点训练VGG11模型:总述
接下来,将在多节点进⾏训练。PyTorch多节点训练可参考和。
第⼀步,⾸先运⾏以确保安装了相对应的package。
接下来, 对于多节点训练,⾸先需要初始化多节点进程init_process_group.
这需要3个参数, backend是不同的通讯⽅式,在本⽂中,我们将使⽤gloo进⾏后端通讯。rank, world_size代表了本机的级别和节点数,因为我们是四个节点的cluster,所以rank分别为0,1,2,3,其中master设置为0, world_size设置为4.
代码如下:
def init_process(master_ip, rank, size, vgg_model, backend='gloo'):
""" 初始化环境 """
dist.init_process_group(backend, rank=rank, world_size=size)
# 跑模型
vgg_model(rank, size)
可以看到,⾸先设置了master节点的ip和port,之后初始化了process group。
接下来,我们需要将数据集分成四份(data partition),并分别发送到四台机器上。
具体的⽅法是使⽤DistributedSampler, 并将data_loader中的sampler改成所对应的DistributedSampler:
from torch.utils.data.distributed import DistributedSampler
sampler_d = DistributedSampler(training_set) if torch.distributed.is_available() else None
train_loader = torch.utils.data.DataLoader(training_set, num_workers=2, batch_size=batch_size, sampler=sampler_d, pin_memory=True)
之后就可以跑模型了。
请注意,要想在各个节点都跑起来模型,需要将代码、数据等在每⼀个机⼦上都有⼀份,其内容可以有略微不同,例如,在node0的代码需要设置为rank=0,node1的代码需要设置为rank=1, 以此类推。接下来会对此进⾏详细解说。
此外,在运⾏时,需要将所有的代码在所有的节点运⾏之后,整个训练才会开始。例如,如果world_siz
e=4,也就是四个节点,那么需要分别在四个机⼦上执⾏代码,当第四个机⼦的代码执⾏后,全部训练才会开始。
多节点训练VGG11模型:使⽤scatter和gather
接下来就进⼊正题,如何⽤不同的⽅法多节点训练模型。具体⽽⾔,我们将分别⽤多种⽅法,分布式计算和更新梯度。
numpy教程 pdf⾸先,因为单机上batch_size=256,⽽我们现在有4个节点,也就是四台机器,所以我们设置每台机⼦上的batch_size=256/4=64,这样每⼀个epoch仍然等价于batch_size=256
接下来,应该新建⼀个组,以确保通信发⽣在组内:
group = w_group([0, 1, 2, 3])
分布式实现训练的思路如下,我们将数据分成四份并分别发送到不同的机⼦上,接下来,在每⼀台机⼦上,根据输⼊数据得出输出数据,将输出与真值对⽐以计算loss,以上两步与单机版的思路基本相同。接下来,需要将四台机⼦的parameters梯度发送到master节点,并计算平均值,以获得统⼀的权值,再发送到各个节点进⾏更新,这样就能确保所训练的模型在各个机⼦上相同。
在这⾥,我们使⽤scatter和gather来发送和收集信息。其中,scatter可以将信息从master节点传到所有的其他节点,gather可以将信息从别的节点获取到master节点。
⽤法如下:
# master node
var_list = [s_like(var) for _ in range(4)]
dist.gather(var, var_list, group=group, async_op=False)
# slave node
dist.gather(var, group=group, async_op=False)
对于gather, ⾸先需要在master node新建⼀个空的list来存储tensor,如果有4个节点则list长度为4,分别存储rank 0, 1, 2, 3节点的这个变量的值。
接下来,dist.gather()第⼀个参数指明了需要获取的每个节点的具体变量名。
⽽slave node只需要将tensor传出即可,不需要新建list存储tensor。
以上这个例⼦就是在master node⽤var_list这个list分别收集了node 0~3每个节点var的值。
# master node
var_list = [var for _ in range(4)]
dist.scatter(var, var_list, group=group, src=0, async_op=False)
# slave node
dist.scatter(var, group=group, src=0, async_op=False)
对于scatter, ⾸先需要在master node新建⼀个list来存储tensor,如果有4个节点则list长度为4,分别存储需要发送到rank 0, 1, 2, 3节点的变量。
接下来,dist.scatter()第⼀个参数指明了每个节点接收数据的具体变量名。
⽽slave node只需要将tensor接收即可,接收的变量为var。
以上这个例⼦就是在master node⽤var_list这个list分别复制了四个var,再分别发送到了所有node 0~3.
接下来我们看具体的代码。对于vgg11模型训练⽽⾔,我们⾸先将四个节点的梯度发送到master节点,求平均值后再分别发送到四个节点。
故master代码和其他node的代码不同。代码在此:,
具体代码master(node0)如下:
# 新建组
group = w_group([0, 1, 2, 3])
for batch_idx, (data, target) in enumerate(train_loader):
data, target = (device), (device)
<_grad()
output = model(data)
train_loss = criterion(output, target)
train_loss.backward()
# 以上均和单机版代码相同。接下来遍历本机模型的parameters,并采集其他节点的grad梯度,计算平均值并发送到其他节点
for p in model.parameters():
# 新建⼀个list存储各个节点的梯度
grad_list = [s_ad) for _ in range(4)]
# 获取所有节点的梯度
dist.ad, grad_list, group=group, async_op=False)
# 计算所有节点的平均梯度
grad_sum = s_ad)
for i in range(4):
grad_sum += grad_list[i]
grad_mean = grad_sum / 4
# 新建⼀个list存储将要发到各个节点的平均梯度
scatter_list = [grad_mean for _ in range(4)]
# 将所有的值发送到各个节点
dist.ad, scatter_list, group=group, src=0, async_op=False)
optimizer.step()
简单⽽⾔,master node⾸先获取了所有节点的梯度并计算了平均值,接下来将该平均值分发到了各个slave nodes。
具体代码slave(node1~3)如下:
# 新建组
group = w_group([0, 1, 2, 3])
for batch_idx, (data, target) in enumerate(train_loader):
data, target = (device), (device)
<_grad()
output = model(data)
train_loss = criterion(output, target)
train_loss.backward()
# 以上均和单机版代码相同。接下来遍历本机模型的parameters,并获取grad梯度,发送到master node,并从master node获取平均值后更新grad
for p in model.parameters():
# 将grad值发送到master node
dist.ad, group=group, async_op=False)
# 接收master node发送过来的grad值
dist.ad, group=group, src=0, async_op=False)
optimizer.step()
slave node⾸先将该节点的梯度发送到了master node, 之后接收了master node计算的梯度平均值.
多节点训练VGG11模型:使⽤isend和irecv
除了scatter和gather, 另⼀种⽅法是使⽤isend和irecv。
isend和irecv属于点对点通讯,可以指定发送和接收的⽬标。因此,不需要新建组。
具体思路仍然是将四台机⼦的parameters梯度发送到master节点,并计算平均值,获得统⼀的梯度,再从master node发送到各个节点进⾏更新. isend可以将信息从⼀个节点传到另⼀个节点,同时对应的另⼀个节点需要使⽤irecv进⾏接收。
⽤法如下:
req = dist.isend(tensor=var, dst=rank)
req.wait()
req = dist.irecv(tensor=var, src=rank)
req.wait()
isend和irecv的第⼀个参数即为需要传输和需要接收的变量名var。对于isend,dst指定了发送时的⽬标(rank),⽽对于irecv,src指定了接受时数据的来源(rank)。
在发送和接收后,需要req.wait()以确保⽬标或者来源机器已经收到或者已经发送了数据,以确保各个机器同步。
接下来我们看具体的代码。对于vgg11模型训练⽽⾔,我们⾸先将四个节点的梯度发送到master节点,求平均值后再分别发送到四个节点。
故master代码和其他node的代码不同。代码在此:,
具体代码master(node0)如下:
for batch_idx, (data, target) in enumerate(train_loader):
data, target = (device), (device)
<_grad()
output = model(data)
train_loss = criterion(output, target)
train_loss.backward()
# 以上均和单机版代码相同。接下来遍历本机模型的parameters,并采集其他节点的grad梯度,计算平均值并发送到其他节点
for p in model.parameters():
# 采集其他节点的grad梯度
grad_1 = s_ad)
grad_2 = s_ad)
grad_3 = s_ad)
req = dist.irecv(tensor=grad_1, src=1)
req.wait()
req = dist.irecv(tensor=grad_2, src=2)
req.wait()
req = dist.irecv(tensor=grad_3, src=3)
req.wait()
# 计算所有节点的梯度平均值
grad_mean = (p.grad + grad_1 + grad_2 + grad_3)/4
# 将梯度平均值发送到其他节点
req = dist.isend(tensor=grad_mean, dst=1)
req.wait()
req = dist.isend(tensor=grad_mean, dst=2)
req.wait()
req = dist.isend(tensor=grad_mean, dst=3)
req.wait()
optimizer.step()
具体代码slave(node1~3)如下:
for batch_idx, (data, target) in enumerate(train_loader):
data, target = (device), (device)
<_grad()
output = model(data)
train_loss = criterion(output, target)
train_loss.backward()
# 以上均和单机版代码相同。接下来遍历本机模型的parameters,并获取grad梯度,发送到master node,并从master node获取平均值后更新grad
for p in model.parameters():
# 将本机的梯度发送到master node
req = dist.isend(ad, dst=0)
req.wait()
# 从master node接收平均梯度
req = dist.irecv(ad, src=0)
req.wait()
optimizer.step()
多节点训练VGG11模型:使⽤all_reduce
以上⽅法虽然完成了不同节点之间的通讯,但由于master node和其他node之间代码不同,所以仍然⽐较⿇烦。PyTorch提供了⼀种简便且⾼效的⽅法。具体⽽⾔就是all_reduce函数,该函数使⽤了ring通讯⽅法,使得通讯效率得到了提升。同时,也完成了组内变量的共享和计算。
⽤法如下:
dist.all_reduce(var, duce_op.SUM, group=group, async_op=False)
与gather(), scatter()相似,⾸先需要建⽴⼀个组。all_reduce()第⼀个参数为需要进⾏运算的变量,第⼆个参数op则包含了⼀些⽅法,例如求和SUM,此外还有MIN, MAX等,可参见.
所以以上代码的意思是计算组内所有节点var变量的总和,且返回该var.
具体思路仍然是将四台机⼦的parameters梯度发送到master节点,并计算平均值,获得统⼀的梯度,再从master node发送到各个节点进⾏更新.
isend可以将信息从⼀个节点传到另⼀个节点,同时对应的另⼀个节点需要使⽤irecv进⾏接收。
req = dist.isend(tensor=var, dst=rank)
req.wait()
req = dist.irecv(tensor=var, src=rank)
req.wait()
接下来我们看具体的代码。对于vgg11模型训练⽽⾔,我们⾸先将四个节点的梯度分布除以4,再全部相
加,即可获得平均梯度值。在这⾥,所有机器的代码均相同。代码在此:.
具体代码如下:
# 新建组
group = w_group([0, 1, 2, 3])
for batch_idx, (data, target) in enumerate(train_loader):
data, target = (device), (device)
<_grad()
output = model(data)
train_loss = criterion(output, target)
train_loss.backward()
# 以上均和单机版代码相同。接下来遍历本机模型的parameters,并计算组内所有机器的梯度平均值
for p in model.parameters():
dist.all_ad, duce_op.SUM, group=group, async_op=False)
optimizer.step()
多节点训练VGG11模型:使⽤Distributed Data Parallel
PyTorch还提供了最新的Distributed Data Parallel (DDP) API,通过Gradient Bucketing更⾼效和⽅便地实现了以上⽅法。
在这⾥,所有机器的代码均相同。代码在此:.
具体⽤法如下:
parallel import DistributedDataParallel as DDP
ddp_model = DDP(model)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.1,
momentum=0.9, weight_decay=0.0001)
也就是将model转换为ddp_model,之后即可和单机模型类似进⾏训练。
以上就是使⽤多种⽅法进⾏PyTorch分布式训练的内容了。所有代码可以参考:.
同时,本⽂基于威斯康星⼤学麦迪逊分校CS744改编⽽成,在此感谢Prof.Shivaram Venkataraman. 同时,本项⽬由饶锦蒙和我共同完成。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。