PyTorch分布式訓(xùn)練簡(jiǎn)明教程
AI編輯:我是小將
神經(jīng)網(wǎng)絡(luò)訓(xùn)練加速的最簡(jiǎn)單方法是使用GPU,對(duì)弈神經(jīng)網(wǎng)絡(luò)中常規(guī)操作(矩陣乘法和加法)GPU運(yùn)算速度要倍超于CPU。隨著模型或數(shù)據(jù)集越來(lái)越大,一個(gè)GPU很快就會(huì)變得不足。例如,BERT和GPT-2等大型語(yǔ)言模型是在數(shù)百個(gè)GPU上訓(xùn)練的。對(duì)于多GPU訓(xùn)練,需要一種在不同GPU之間對(duì)模型和數(shù)據(jù)進(jìn)行切分和調(diào)度的方法。
PyTorch是非常流行的深度學(xué)習(xí)框架,它在主流框架中對(duì)于靈活性和易用性的平衡最好。Pytorch有兩種方法可以在多個(gè)GPU上切分模型和數(shù)據(jù):nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需簡(jiǎn)單包裝單GPU模型)。然而,由于它使用一個(gè)進(jìn)程來(lái)計(jì)算模型權(quán)重,然后在每個(gè)批處理期間將分發(fā)到每個(gè)GPU,因此通信很快成為一個(gè)瓶頸,GPU利用率通常很低。而且,nn.DataParallel要求所有的GPU都在同一個(gè)節(jié)點(diǎn)上(不支持分布式),而且不能使用Apex進(jìn)行混合精度訓(xùn)練。nn.DataParallel和nn.distributedataparallel的主要差異可以總結(jié)為以下幾點(diǎn)(譯者注):
DistributedDataParallel支持模型并行,而DataParallel并不支持,這意味如果模型太大單卡顯存不足時(shí)只能使用前者;DataParallel是單進(jìn)程多線程的,只用于單卡情況,而DistributedDataParallel是多進(jìn)程的,適用于單機(jī)和多機(jī)情況,真正實(shí)現(xiàn)分布式訓(xùn)練;DistributedDataParallel的訓(xùn)練更高效,因?yàn)槊總€(gè)進(jìn)程都是獨(dú)立的Python解釋器,避免GIL問(wèn)題,而且通信成本低其訓(xùn)練速度更快,基本上DataParallel已經(jīng)被棄用;必須要說(shuō)明的是 DistributedDataParallel中每個(gè)進(jìn)程都有獨(dú)立的優(yōu)化器,執(zhí)行自己的更新過(guò)程,但是梯度通過(guò)通信傳遞到每個(gè)進(jìn)程,所有執(zhí)行的內(nèi)容是相同的;
總的來(lái)說(shuō),Pytorch文檔是相當(dāng)完備和清晰的,尤其是在1.0x版本后。但是關(guān)于DistributedDataParallel的介紹卻較少,主要的文檔有以下三個(gè):
Writing Distributed Applications with PyTorch:主要介紹分布式API,分布式配置,不同通信機(jī)制以及內(nèi)部機(jī)制,但是說(shuō)實(shí)話大部分人不太同意看懂,而且很少會(huì)直接用這些; Getting Started with Distributed Data Parallel:簡(jiǎn)單介紹了如何使用 DistributedDataParallel,但是用例并不清晰完整;ImageNet training in PyTorch:比較完整的使用實(shí)例,但是僅有代碼,缺少詳細(xì)說(shuō)明;( apex也提供了一個(gè)類似的訓(xùn)練用例Mixed Precision ImageNet Training in PyTorch)(advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亞馬遜云上進(jìn)行分布式訓(xùn)練,但是估計(jì)很多人用不到。
這篇教程將通過(guò)一個(gè)MNISI例子講述如何使用PyTorch的分布式訓(xùn)練,這里將一段段代碼進(jìn)行解釋,而且也包括任何使用apex進(jìn)行混合精度訓(xùn)練。
DistributedDataParallel內(nèi)部機(jī)制
DistributedDataParallel通過(guò)多進(jìn)程在多個(gè)GPUs間復(fù)制模型,每個(gè)GPU都由一個(gè)進(jìn)程控制(當(dāng)然可以讓每個(gè)進(jìn)程控制多個(gè)GPU,但這顯然比每個(gè)進(jìn)程有一個(gè)GPU要慢;也可以多個(gè)進(jìn)程在一個(gè)GPU上運(yùn)行)。GPU可以都在同一個(gè)節(jié)點(diǎn)上,也可以分布在多個(gè)節(jié)點(diǎn)上。每個(gè)進(jìn)程都執(zhí)行相同的任務(wù),并且每個(gè)進(jìn)程都與所有其他進(jìn)程通信。進(jìn)程或者說(shuō)GPU之間只傳遞梯度,這樣網(wǎng)絡(luò)通信就不再是瓶頸。

在訓(xùn)練過(guò)程中,每個(gè)進(jìn)程從磁盤加載batch數(shù)據(jù),并將它們傳遞到其GPU。每一個(gè)GPU都有自己的前向過(guò)程,然后梯度在各個(gè)GPUs間進(jìn)行All-Reduce。每一層的梯度不依賴于前一層,所以梯度的All-Reduce和后向過(guò)程同時(shí)計(jì)算,以進(jìn)一步緩解網(wǎng)絡(luò)瓶頸。在后向過(guò)程的最后,每個(gè)節(jié)點(diǎn)都得到了平均梯度,這樣模型參數(shù)保持同步。
這都要求多個(gè)進(jìn)程(可能在多個(gè)節(jié)點(diǎn)上)同步并通信。Pytorch通過(guò)distributed.init_process_group函數(shù)來(lái)實(shí)現(xiàn)這一點(diǎn)。他需要知道進(jìn)程0位置以便所有進(jìn)程都可以同步,以及預(yù)期的進(jìn)程總數(shù)。每個(gè)進(jìn)程都需要知道進(jìn)程總數(shù)及其在進(jìn)程中的順序,以及使用哪個(gè)GPU。通常將進(jìn)程總數(shù)稱為world_size.Pytorch提供了nn.utils.data.DistributedSampler來(lái)為各個(gè)進(jìn)程切分?jǐn)?shù)據(jù),以保證訓(xùn)練數(shù)據(jù)不重疊。
實(shí)例講解
這里通過(guò)一個(gè)MNIST實(shí)例來(lái)講解,我們先將其改成分布式訓(xùn)練,然后增加混合精度訓(xùn)練。
普通單卡訓(xùn)練
首先,導(dǎo)入所需要的庫(kù):
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from apex.parallel import DistributedDataParallel as DDP
from apex import amp
然后我們定義一個(gè)簡(jiǎn)單的CNN模型處理MNIST數(shù)據(jù):
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
主函數(shù)main()接受參數(shù),執(zhí)行訓(xùn)練:
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
train(0, args)
其中訓(xùn)練部分主函數(shù)為:
def train(gpu, args):
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
通過(guò)啟動(dòng)主函數(shù)來(lái)開(kāi)始訓(xùn)練:
if __name__ == '__main__':
main()
你可能注意到有些參數(shù)是多余的,但是對(duì)后面的分布式訓(xùn)練是有用的。我們通過(guò)執(zhí)行以下語(yǔ)句就可以在單機(jī)單卡上訓(xùn)練:
python src/mnist.py -n 1 -g 1 -nr 0
分布式訓(xùn)練
使用多進(jìn)程進(jìn)行分布式訓(xùn)練,我們需要為每個(gè)GPU啟動(dòng)一個(gè)進(jìn)程。每個(gè)進(jìn)程需要知道自己運(yùn)行在哪個(gè)GPU上,以及自身在所有進(jìn)程中的序號(hào)。對(duì)于多節(jié)點(diǎn),我們需要在每個(gè)節(jié)點(diǎn)啟動(dòng)腳本。
首先,我們要配置基本的參數(shù):
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1,
type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int,
metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
#########################################################
args.world_size = args.gpus * args.nodes #
os.environ['MASTER_ADDR'] = '10.57.23.164' #
os.environ['MASTER_PORT'] = '8888' #
mp.spawn(train, nprocs=args.gpus, args=(args,)) #
#########################################################
其中args.nodes是節(jié)點(diǎn)總數(shù),而args.gpus是每個(gè)節(jié)點(diǎn)的GPU總數(shù)(每個(gè)節(jié)點(diǎn)GPU數(shù)是一樣的),而args.nr 是當(dāng)前節(jié)點(diǎn)在所有節(jié)點(diǎn)的序號(hào)。節(jié)點(diǎn)總數(shù)乘以每個(gè)節(jié)點(diǎn)的GPU數(shù)可以得到world_size,也即進(jìn)程總數(shù)。所有的進(jìn)程需要知道進(jìn)程0的IP地址以及端口,這樣所有進(jìn)程可以在開(kāi)始時(shí)同步,一般情況下稱進(jìn)程0是master進(jìn)程,比如我們會(huì)在進(jìn)程0中打印信息或者保存模型。PyTorch提供了mp.spawn來(lái)在一個(gè)節(jié)點(diǎn)啟動(dòng)該節(jié)點(diǎn)所有進(jìn)程,每個(gè)進(jìn)程運(yùn)行train(i, args),其中i從0到args.gpus - 1。
同樣,我們要修改訓(xùn)練函數(shù):
def train(gpu, args):
############################################################
rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank
)
############################################################
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
###############################################################
# Wrap the model
model = nn.parallel.DistributedDataParallel(model,
device_ids=[gpu])
###############################################################
# Data loading code
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
################################################################
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=args.world_size,
rank=rank
)
################################################################
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
##############################
shuffle=False, #
##############################
num_workers=0,
pin_memory=True,
#############################
sampler=train_sampler) #
#############################
...
這里我們首先計(jì)算出當(dāng)前進(jìn)程序號(hào):rank = args.nr * args.gpus + gpu,然后就是通過(guò)dist.init_process_group初始化分布式環(huán)境,其中backend參數(shù)指定通信后端,包括mpi, gloo, nccl,這里選擇nccl,這是Nvidia提供的官方多卡通信框架,相對(duì)比較高效。mpi也是高性能計(jì)算常用的通信協(xié)議,不過(guò)你需要自己安裝MPI實(shí)現(xiàn)框架,比如OpenMPI。gloo倒是內(nèi)置通信后端,但是不夠高效。init_method指的是如何初始化,以完成剛開(kāi)始的進(jìn)程同步;這里我們?cè)O(shè)置的是env://,指的是環(huán)境變量初始化方式,需要在環(huán)境變量中配置4個(gè)參數(shù):MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面兩個(gè)參數(shù)我們已經(jīng)配置,后面兩個(gè)參數(shù)也可以通過(guò)dist.init_process_group函數(shù)中world_size和rank參數(shù)配置。其它的初始化方式還包括共享文件系統(tǒng)以及TCP,比如init_method='tcp://10.1.1.20:23456',其實(shí)也是要提供master的IP地址和端口。注意這個(gè)調(diào)用是阻塞的,必須等待所有進(jìn)程來(lái)同步,如果任何一個(gè)進(jìn)程出錯(cuò),就會(huì)失敗。
對(duì)于模型側(cè),我們只需要用DistributedDataParallel包裝一下原來(lái)的model即可,在背后它會(huì)支持梯度的All-Reduce操作。對(duì)于數(shù)據(jù)側(cè),我們nn.utils.data.DistributedSampler來(lái)給各個(gè)進(jìn)程切分?jǐn)?shù)據(jù),只需要在dataloader中使用這個(gè)sampler就好,值得注意的一點(diǎn)是你要訓(xùn)練循環(huán)過(guò)程的每個(gè)epoch開(kāi)始時(shí)調(diào)用train_sampler.set_epoch(epoch),(主要是為了保證每個(gè)epoch的劃分是不同的)其它的訓(xùn)練代碼都保持不變。
最后就可以執(zhí)行代碼了,比如我們是4節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)是8卡,那么需要在4個(gè)節(jié)點(diǎn)分別執(zhí)行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此時(shí)的有效batch_size其實(shí)是batch_size_per_gpu * world_size,對(duì)于有BN的模型還可以采用同步BN獲取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述講述的是分布式訓(xùn)練過(guò)程,其實(shí)同樣適用于評(píng)估或者測(cè)試過(guò)程,比如我們把數(shù)據(jù)劃分到不同的進(jìn)程中進(jìn)行預(yù)測(cè),這樣可以加速預(yù)測(cè)過(guò)程。實(shí)現(xiàn)代碼和上述過(guò)程完全一樣,不過(guò)我們想計(jì)算某個(gè)指標(biāo),那就需要從各個(gè)進(jìn)程的統(tǒng)計(jì)結(jié)果進(jìn)行All-Reduce,因?yàn)槊總€(gè)進(jìn)程僅是計(jì)算的部分?jǐn)?shù)據(jù)的內(nèi)容。比如我們要計(jì)算分類準(zhǔn)確度,我們可以統(tǒng)計(jì)每個(gè)進(jìn)程的數(shù)據(jù)總數(shù)total和分類正確的數(shù)量count,然后進(jìn)行聚合。這里要提的一點(diǎn),當(dāng)用dist.init_process_group初始化分布式環(huán)境時(shí),其實(shí)就是建立一個(gè)默認(rèn)的分布式進(jìn)程組(distributed process group),這個(gè)group同時(shí)會(huì)初始化Pytorch的torch.distributed包。這樣我們可以直接用torch.distributed的API就可以進(jìn)行分布式基本操作了,下面是具體實(shí)現(xiàn):
# define tensor on GPU, count and total is the result at each GPU
t = torch.tensor([count, total], dtype=torch.float64, device='cuda')
dist.barrier() # synchronizes all processes
dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result.
t = t.tolist()
all_count = int(t[0])
all_total = int(t[1])
acc = all_count / all_total
混合精度訓(xùn)練(采用apex)
混合精度訓(xùn)練(混合FP32和FP16訓(xùn)練)可以適用更大的batch_size,而且可以利用NVIDIA Tensor Cores加速計(jì)算。采用NVIDIA的apex進(jìn)行混合精度訓(xùn)練非常簡(jiǎn)單,只需要修改部分代碼:
rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank)
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Wrap the model
##############################################################
model, optimizer = amp.initialize(model, optimizer,
opt_level='O2')
model = DDP(model)
##############################################################
# Data loading code
...
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
##############################################################
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
##############################################################
optimizer.step()
...
其實(shí)就兩處變化,首先是采用amp.initialize來(lái)包裝model和optimizer以支持混合精度訓(xùn)練,其中opt_level指的是優(yōu)化級(jí)別,如果為O0或者O3不是真正的混合精度,但是可以用來(lái)確定模型效果和速度的baseline,而O1和O2是混合精度的兩種設(shè)置,可以選擇某個(gè)進(jìn)行混合精度訓(xùn)練。另外一處是在進(jìn)行根據(jù)梯度更新參數(shù)前,要先通過(guò)amp.scale_loss對(duì)梯度進(jìn)行scale以防止梯度下溢(underflowing)。此外,你還可以用apex.parallel.DistributedDataParallel替換nn.DistributedDataParallel。
題外話
我覺(jué)得PyTorch官方的分布式實(shí)現(xiàn)已經(jīng)比較完善,而且性能和效果都不錯(cuò),可以替代的方案是horovod,不僅支持PyTorch還支持TensorFlow和MXNet框架,實(shí)現(xiàn)起來(lái)也是比較容易的,速度方面應(yīng)該不相上下。
參考
Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html?(大部分內(nèi)容來(lái)自此處) torch.distributed?https://pytorch.org/docs/stable/distributed.html
AI編輯:我是小將
推薦閱讀
堪比Focal Loss!解決目標(biāo)檢測(cè)中樣本不平衡的無(wú)采樣方法
這份Kaggle Grandmaster的圖像分類訓(xùn)練技巧,你知道多少?
TensorFlow 2.2.0-rc0,這次更新讓人驚奇!
另辟蹊徑!斯坦福大學(xué)提出邊界框回歸任務(wù)新Loss:GIoU
機(jī)器學(xué)習(xí)算法工程師
? ??? ? ? ? ? ? ? ? ? ? ? ??????????????????一個(gè)用心的公眾號(hào)
?

