當(dāng)代研究生應(yīng)當(dāng)掌握的5種Pytorch并行訓(xùn)練方法(單機(jī)多卡)
點(diǎn)擊上方“程序員大白”,選擇“星標(biāo)”公眾號(hào)
重磅干貨,第一時(shí)間送達(dá)
導(dǎo)讀
?利用PyTorch,作者編寫了不同加速庫在ImageNet上的單機(jī)多卡使用示例,方便讀者取用。
又到適宜劃水的周五啦,機(jī)器在學(xué)習(xí),人很無聊。在打開 b 站 “學(xué)習(xí)” 之前看著那空著一半的顯卡決定寫點(diǎn)什么喂飽它們~因此,從 V100-PICE/V100/K80 中各拿出 4 張卡,試驗(yàn)一下哪種分布式學(xué)習(xí)庫速度最快!這下終于能把剩下的顯存吃完啦,又是老師的勤奮好學(xué)生啦(我真是個(gè)小機(jī)靈鬼)!
Take-Away
筆者使用 PyTorch 編寫了不同加速庫在 ImageNet 上的使用示例(單機(jī)多卡),需要的同學(xué)可以當(dāng)作 quickstart 將需要的部分 copy 到自己的項(xiàng)目中(Github 請(qǐng)點(diǎn)擊下面鏈接):
1、簡單方便的 nn.DataParallel
https://github.com/tczhangzhi/pytorch-distributed/blob/master/dataparallel.py
2、使用 torch.distributed 加速并行訓(xùn)練
https://github.com/tczhangzhi/pytorch-distributed/blob/master/distributed.py
3、使用 torch.multiprocessing 取代啟動(dòng)器
https://github.com/tczhangzhi/pytorch-distributed/blob/master/multiprocessing_distributed.py
4、使用 apex 再加速
https://github.com/tczhangzhi/pytorch-distributed/blob/master/apex_distributed.py
5、horovod 的優(yōu)雅實(shí)現(xiàn)
https://github.com/tczhangzhi/pytorch-distributed/blob/master/horovod_distributed.py
這里,筆者記錄了使用 4 塊 Tesla V100-PICE 在 ImageNet 進(jìn)行了運(yùn)行時(shí)間的測試,測試結(jié)果發(fā)現(xiàn) Apex 的加速效果最好,但與 Horovod/Distributed 差別不大,平時(shí)可以直接使用內(nèi)置的 Distributed。Dataparallel 較慢,不推薦使用。(后續(xù)會(huì)補(bǔ)上 V100/K80 上的測試結(jié)果,穿插了一些試驗(yàn)所以中斷了)

簡要記錄一下不同庫的分布式訓(xùn)練方式,當(dāng)作代碼的 README(我真是個(gè)小機(jī)靈鬼)~
簡單方便的 nn.DataParallel
DataParallel 可以幫助我們(使用單進(jìn)程控)將模型和數(shù)據(jù)加載到多個(gè) GPU 中,控制數(shù)據(jù)在 GPU 之間的流動(dòng),協(xié)同不同 GPU 上的模型進(jìn)行并行訓(xùn)練(細(xì)粒度的方法有 scatter,gather 等等)。
DataParallel 使用起來非常方便,我們只需要用 DataParallel 包裝模型,再設(shè)置一些參數(shù)即可。需要定義的參數(shù)包括:參與訓(xùn)練的 GPU 有哪些,device_ids=gpus;用于匯總梯度的 GPU 是哪個(gè),output_device=gpus[0] 。DataParallel 會(huì)自動(dòng)幫我們將數(shù)據(jù)切分 load 到相應(yīng) GPU,將模型復(fù)制到相應(yīng) GPU,進(jìn)行正向傳播計(jì)算梯度并匯總:
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])值得注意的是,模型和數(shù)據(jù)都需要先 load 進(jìn) GPU 中,DataParallel 的 module 才能對(duì)其進(jìn)行處理,否則會(huì)報(bào)錯(cuò):
# 這里要 model.cuda()
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
# 這里要 images/target.cuda()
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()# main.py
import torch
import torch.distributed as dist
gpus = [0, 1, 2, 3]
torch.cuda.set_device('cuda:{}'.format(gpus[0]))
train_dataset = ...
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
model = ...
model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()python main.py使用 torch.distributed 加速并行訓(xùn)練
在 pytorch 1.0 之后,官方終于對(duì)分布式的常用方法進(jìn)行了封裝,支持 all-reduce,broadcast,send 和 receive 等等。通過 MPI 實(shí)現(xiàn) CPU 通信,通過 NCCL 實(shí)現(xiàn) GPU 通信。官方也曾經(jīng)提到用 DistributedDataParallel 解決 DataParallel 速度慢,GPU 負(fù)載不均衡的問題,目前已經(jīng)很成熟了~
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', default=-1, type=int,
help='node rank for distributed training')
args = parser.parse_args()
print(args.local_rank)dist.init_process_group(backend='nccl')train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])torch.cuda.set_device(args.local_rank)
model.cuda()
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
# main.py
import torch
import argparse
import torch.distributed as dist
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', default=-1, type=int,
help='node rank for distributed training')
args = parser.parse_args()
dist.init_process_group(backend='nccl')
torch.cuda.set_device(args.local_rank)
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py使用 torch.multiprocessing 取代啟動(dòng)器
有的同學(xué)可能比較熟悉 torch.multiprocessing,也可以手動(dòng)使用 torch.multiprocessing 進(jìn)行多進(jìn)程控制。繞開 torch.distributed.launch 自動(dòng)控制開啟和退出進(jìn)程的一些小毛病~
import torch.multiprocessing as mp
mp.spawn(main_worker, nprocs=4, args=(4, myargs))def main_worker(proc, ngpus_per_node, args):
dist.init_process_group(backend='nccl', init_method='tcp://127.0.0.1:23456', world_size=4, rank=gpu)
torch.cuda.set_device(args.local_rank)
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()dist.init_process_group(backend='nccl', init_method='tcp://127.0.0.1:23456', world_size=4, rank=gpu)# main.py
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
mp.spawn(main_worker, nprocs=4, args=(4, myargs))
def main_worker(proc, ngpus_per_node, args):
dist.init_process_group(backend='nccl', init_method='tcp://127.0.0.1:23456', world_size=4, rank=gpu)
torch.cuda.set_device(args.local_rank)
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()python main.py使用 Apex 再加速
Apex 是 NVIDIA 開源的用于混合精度訓(xùn)練和分布式訓(xùn)練庫。Apex 對(duì)混合精度訓(xùn)練的過程進(jìn)行了封裝,改兩三行配置就可以進(jìn)行混合精度的訓(xùn)練,從而大幅度降低顯存占用,節(jié)約運(yùn)算時(shí)間。此外,Apex 也提供了對(duì)分布式訓(xùn)練的封裝,針對(duì) NVIDIA 的 NCCL 通信庫進(jìn)行了優(yōu)化。
from apex import amp
model, optimizer = amp.initialize(model, optimizer)from apex.parallel import DistributedDataParallel
model = DistributedDataParallel(model)
# # torch.distributed
# model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
# model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank)with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()# main.py
import torch
import argparse
import torch.distributed as dist
from apex.parallel import DistributedDataParallel
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', default=-1, type=int,
help='node rank for distributed training')
args = parser.parse_args()
dist.init_process_group(backend='nccl')
torch.cuda.set_device(args.local_rank)
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...
model, optimizer = amp.initialize(model, optimizer)
model = DistributedDataParallel(model, device_ids=[args.local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
optimizer.zero_grad()
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()UDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.pyHorovod 的優(yōu)雅實(shí)現(xiàn)
Horovod 是 Uber 開源的深度學(xué)習(xí)工具,它的發(fā)展吸取了 Facebook "Training ImageNet In 1 Hour" 與百度 "Ring Allreduce" 的優(yōu)點(diǎn),可以無痛與 PyTorch/Tensorflow 等深度學(xué)習(xí)框架結(jié)合,實(shí)現(xiàn)并行訓(xùn)練。
import horovod.torch as hvd
hvd.local_rank()hvd.init()train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), compression=hvd.Compression.fp16)torch.cuda.set_device(args.local_rank)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()# main.py
import torch
import horovod.torch as hvd
hvd.init()
torch.cuda.set_device(hvd.local_rank())
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...
model.cuda()
optimizer = optim.SGD(model.parameters())
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()CUDA_VISIBLE_DEVICES=0,1,2,3 horovodrun -np 4 -H localhost:4 --verbose python main.py
尾注



推薦閱讀
關(guān)于程序員大白
程序員大白是一群哈工大,東北大學(xué),西湖大學(xué)和上海交通大學(xué)的碩士博士運(yùn)營維護(hù)的號(hào),大家樂于分享高質(zhì)量文章,喜歡總結(jié)知識(shí),歡迎關(guān)注[程序員大白],大家一起學(xué)習(xí)進(jìn)步!


