Pytorch Distributed Data Parallel(DDP) 実装例 (pytorch ddp vs huggingface accelerate)
はじめに
DataParallelといえばnn.DataParallel()
でモデルを包んであげるだけで実現できますが、PythonのGILがボトルネックとなり、最大限リソースを活用できません。
最近では、PytorchもDDPを推奨しています。が、ソースの変更点が多く、コーディングの難易度が上がっています。どっちもどっちですね…。
今回はkaggleにある犬と猫の画像データセットを使って、画像の分類問題を例に、DDPを使って見たいと思います。
Dogs vs. Cats Redux: Kernels Edition | Kaggle
Data -> Download All からダウンロード
新しくhuggingface accelerateを用いたDDPの実装を加えました (2021/11/1)
学習コード
モデルはtorchivisionのVGG16を使いました。 ここのtrain_model関数をベースに、DDP向けに改造します。
Finetuning Torchvision Models — PyTorch Tutorials 1.2.0 documentation
- DDPはこれを参考にしました。
https://gist.github.com/sgraaf/5b0caa3a320f28c27c12b5efeb35aa4c
- Dataloaderはこのカーネルを参考にしました。
https://www.kaggle.com/alpaca0984/dog-vs-cat-with-pytorch#Generate-submittion.csv
DataParallel
nn.DataParallelのソース
import time import copy from tqdm import tqdm import multiprocessing as mp import numpy as np import torch import torch.nn as nn import torch.optim as optim from torchvision import transforms import torchvision.models as models from torchvision import datasets import matplotlib.pyplot as plt from src.datasets import DogCatDataset # 設定 IMAGE_SIZE = 224 NUM_CLASSES = 2 BATCH_SIZE = 50 NUM_EPOCH = 1 # シード固定 torch.manual_seed(42) np.random.seed(42) # GPUが使えたらGPUにする device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 画像の場所 train_dir = './data/train' test_dir = './data/test' # データの前処理を定義 transform = transforms.Compose( [ # 画像のリサイズ transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), # tensorに変換 transforms.ToTensor(), # 正規化, 計算した値を入れる transforms.Normalize(mean=[0.4883, 0.4551, 0.4170], std=[0.2257, 0.2211, 0.2214]) ] ) # 訓練データを取得 train_dataset = DogCatDataset( csv_file="./data/train.csv", root_dir=train_dir, # 画像を保存したディレクトリ(適宜書き換えて) transform=transform ) # train val 分割 n_samples = len(train_dataset) # n_samples is 60000 train_size = int(len(train_dataset) * 0.8) # train_size is 48000 val_size = n_samples - train_size # val_size is 48000 # shuffleしてから分割してくれる. train_dataset, val_dataset = torch.utils.data.random_split( train_dataset, [train_size, val_size]) datasets = {'train': train_dataset, 'val': val_dataset} # Create training and validation dataloaders dataloaders = { x: torch.utils.data.DataLoader( datasets[x], batch_size=BATCH_SIZE, shuffle=True, pin_memory=True, num_workers=mp.cpu_count()) for x in ['train', 'val'] } # 転移学習する model = models.vgg16(pretrained=True, progress=True) # 最終層を2クラスに書き換え(ImageNetは1000クラス) model.classifier[6] = nn.Linear(4096, NUM_CLASSES) # GPUに転送 model = model.to(device) # # GPU 4つ使えるようにする # model = nn.DataParallel(model) # optimizerはSGD optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) # ロス関数 criterion = nn.CrossEntropyLoss() # 学習スタート # https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html since = time.time() history = {'accuracy': [], 'val_accuracy': [], 'loss': [], 'val_loss': []} best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 for epoch in range(NUM_EPOCH): print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1)) print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'val']: if phase == 'train': model.train() # Set model to training mode else: model.eval() # Set model to evaluate mode running_loss = 0.0 running_corrects = 0 # Iterate over data. for inputs, labels in tqdm(dataloaders[phase]): inputs = inputs.to(device) labels = labels.to(device) # zero the parameter gradients optimizer.zero_grad() # forward # track history if only in train with torch.set_grad_enabled(phase == 'train'): # Get model outputs and calculate loss outputs = model(inputs) loss = criterion(outputs, labels) _, preds = torch.max(outputs, 1) # backward + optimize only if in training phase if phase == 'train': loss.backward() optimizer.step() # statistics running_loss += loss * inputs.size(0) running_corrects += torch.sum(preds == labels.data) epoch_loss = running_loss.item() / len(dataloaders[phase].dataset) epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset) print( '{} Loss: {:.4f} Acc: {:.4f}'.format( phase, epoch_loss, epoch_acc)) # deep copy the model if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = copy.deepcopy(model.state_dict()) if phase == 'train': history['accuracy'].append(epoch_acc.item()) history['loss'].append(epoch_loss) else: history['val_accuracy'].append(epoch_acc.item()) history['val_loss'].append(epoch_loss) print() time_elapsed = time.time() - since print( 'Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best val Acc: {:4f}'.format(best_acc)) # load best model weights model.load_state_dict(best_model_wts) # ロードのときにだるいのでcpuに変更して保存 model = model.to('cpu') torch.save(model.state_dict(), './model/best.pth') # plot acc = history['accuracy'] val_acc = history['val_accuracy'] loss = history['loss'] val_loss = history['val_loss'] epochs_range = range(NUM_EPOCH) plt.figure(figsize=(24, 8)) plt.subplot(1, 2, 1) plt.plot(epochs_range, acc, label='Training Accuracy') plt.plot(epochs_range, val_acc, label='Validation Accuracy') plt.legend(loc='lower right') plt.title('Training and Validation Accuracy') plt.subplot(1, 2, 2) plt.plot(epochs_range, loss, label='Training Loss') plt.plot(epochs_range, val_loss, label='Validation Loss') plt.legend(loc='upper right') plt.title('Training and Validation Loss') plt.savefig("training_results.png")
DDP
DDPのソース
import time import copy import random from tqdm import tqdm import multiprocessing as mp import numpy as np import torch import torch.nn as nn import torch.optim as optim from torchvision import transforms import torchvision.models as models from torchvision import datasets import matplotlib.pyplot as plt from src.datasets import DogCatDataset # DDP from argparse import ArgumentParser import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data.distributed import DistributedSampler parser = ArgumentParser('DDP usage example') parser.add_argument('--local_rank', type=int, default=-1, metavar='N', help='Local process rank.') # you need this argument in your scripts for DDP to work args = parser.parse_args() args.is_master = args.local_rank == 0 # init dist.init_process_group(backend='nccl', init_method='env://') torch.cuda.set_device(args.local_rank) # 設定 IMAGE_SIZE = 224 NUM_CLASSES = 2 BATCH_SIZE = 50 NUM_EPOCH = 1 # シード固定 torch.cuda.manual_seed_all(42) torch.manual_seed(42) np.random.seed(42) random.seed(42) # 画像の場所 train_dir = './data/train' test_dir = './data/test' # データの前処理を定義 transform = transforms.Compose( [ # 画像のリサイズ transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), # tensorに変換 transforms.ToTensor(), # 正規化, 計算した値を入れる transforms.Normalize(mean=[0.4883, 0.4551, 0.4170], std=[0.2257, 0.2211, 0.2214]) ] ) # 訓練データを取得 train_dataset = DogCatDataset( csv_file="./data/train.csv", root_dir=train_dir, # 画像を保存したディレクトリ(適宜書き換えて) transform=transform ) # train val 分割 n_samples = len(train_dataset) # n_samples is 60000 train_size = int(len(train_dataset) * 0.8) # train_size is 48000 val_size = n_samples - train_size # val_size is 48000 # shuffleしてから分割してくれる. train_dataset, val_dataset = torch.utils.data.random_split( train_dataset, [train_size, val_size]) datasets = {'train': train_dataset, 'val': val_dataset} # Create training and validation dataloaders dataloaders = { x: torch.utils.data.DataLoader( datasets[x], batch_size=BATCH_SIZE, pin_memory=True, sampler=DistributedSampler(datasets[x], rank=args.local_rank), num_workers=mp.cpu_count()) for x in ['train', 'val'] } # 転移学習する model = models.vgg16(pretrained=True, progress=True) # 最終層を2クラスに書き換え(ImageNetは1000クラス) model.classifier[6] = nn.Linear(4096, NUM_CLASSES) # GPUに転送 model = model.cuda() # DDP model = DDP( model, device_ids=[args.local_rank] ) # optimizerはSGD optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) # ロス関数 criterion = nn.CrossEntropyLoss() # 学習スタート # https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html since = time.time() history = {'accuracy': [], 'val_accuracy': [], 'loss': [], 'val_loss': []} best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 for epoch in range(NUM_EPOCH): if args.is_master: print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1)) print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'val']: dist.barrier() # let all processes sync up before starting with a new epoch of training if phase == 'train': model.train() # Set model to training mode else: model.eval() # Set model to evaluate mode running_loss = 0.0 running_corrects = 0 # Iterate over data. for inputs, labels in tqdm(dataloaders[phase]): inputs = inputs.cuda() labels = labels.cuda() # zero the parameter gradients optimizer.zero_grad() # forward # track history if only in train with torch.set_grad_enabled(phase == 'train'): # Get model outputs and calculate loss outputs = model(inputs) loss = criterion(outputs, labels) _, preds = torch.max(outputs, 1) # backward + optimize only if in training phase if phase == 'train': loss.backward() optimizer.step() # statistics running_loss += loss * inputs.size(0) running_corrects += torch.sum(preds == labels.data) # 他のノードから集める dist.all_reduce(running_loss, op=dist.ReduceOp.SUM) dist.all_reduce(running_corrects, op=dist.ReduceOp.SUM) if args.is_master: epoch_loss = running_loss.item() / len(dataloaders[phase].dataset) epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset) print( '{} Loss: {:.4f} Acc: {:.4f}'.format( phase, epoch_loss, epoch_acc)) # deep copy the model if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = copy.deepcopy(model.state_dict()) if phase == 'train': history['accuracy'].append(epoch_acc.item()) history['loss'].append(epoch_loss) else: history['val_accuracy'].append(epoch_acc.item()) history['val_loss'].append(epoch_loss) print() time_elapsed = time.time() - since if args.is_master: print( 'Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best val Acc: {:4f}'.format(best_acc)) # load best model weights model.load_state_dict(best_model_wts) # ロードのときにだるいのでcpuに変更して保存 model = model.to('cpu') torch.save(model.module.state_dict(), './model/best.pth') # plot acc = history['accuracy'] val_acc = history['val_accuracy'] loss = history['loss'] val_loss = history['val_loss'] epochs_range = range(NUM_EPOCH) plt.figure(figsize=(24, 8)) plt.subplot(1, 2, 1) plt.plot(epochs_range, acc, label='Training Accuracy') plt.plot(epochs_range, val_acc, label='Validation Accuracy') plt.legend(loc='lower right') plt.title('Training and Validation Accuracy') plt.subplot(1, 2, 2) plt.plot(epochs_range, loss, label='Training Loss') plt.plot(epochs_range, val_loss, label='Validation Loss') plt.legend(loc='upper right') plt.title('Training and Validation Loss') plt.savefig("training_results.png") # destrory all processes dist.destroy_process_group()
実行コマンド
- シングルノード、4GPUの場合
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=1 --node_rank 0 train.py
DDPソース説明
parser = ArgumentParser('DDP usage example') parser.add_argument('--local_rank', type=int, default=-1, metavar='N', help='Local process rank.') # you need this argument in your scripts for DDP to work
- DistributedSamplerを使います。pin_memoryは、データのキャッシュ関係のフラグで少し速くなるようです。
dataloaders = { x: torch.utils.data.DataLoader( datasets[x], batch_size=BATCH_SIZE, pin_memory=True, sampler=DistributedSampler(datasets[x], rank=args.local_rank), num_workers=mp.cpu_count()) for x in ['train', 'val'] }
- VGG16を使います。最終層を任意のクラス数に書き換えて、fine-tuningを行います。
- DDPでラップしてあげます。
# 転移学習する model = models.vgg16(pretrained=True, progress=True) # 最終層を2クラスに書き換え(ImageNetは1000クラス) model.classifier[6] = nn.Linear(4096, NUM_CLASSES) # GPUに転送 model = model.cuda() # DDP model = DDP( model, device_ids=[args.local_rank] )
- エポックが始まる前に、全GPUで同期を取ります。
dist.barrier() # let all processes sync up before starting with a new epoch of training
- lossとaccをそれぞれのGPUで算出して、ここで一箇所に集めます。
# 他のノードから集める
dist.all_reduce(running_loss, op=dist.ReduceOp.SUM)
dist.all_reduce(running_corrects, op=dist.ReduceOp.SUM)
- モデルの保存、ログ関係はrankが0のGPUでのみ行います。
if args.is_master:
DDP (accelerate)
この記事を投稿した後に、Huggingfaceから Accelerateというライブラリが登場しました。
Documentによると、簡単なソース変更でDDPやDeepSpeed、mixed precisionなどが実装できるようです。
pipで簡単に入ります
pip install accelerate
コンペでpytorch-lightningを使ってみて、微妙だったのでこのライブラリが最強かもしれません。
DDPのソース (accelerate)
import time import copy from tqdm import tqdm import multiprocessing as mp import numpy as np import torch import torch.nn as nn import torch.optim as optim from torchvision import transforms import torchvision.models as models from torchvision import datasets import matplotlib.pyplot as plt from accelerate import Accelerator from src.datasets import DogCatDataset # 設定 IMAGE_SIZE = 224 NUM_CLASSES = 2 BATCH_SIZE = 50 NUM_EPOCH = 2 # シード固定 torch.manual_seed(42) np.random.seed(42) # デバイスの指定 accelerator = Accelerator() device = accelerator.device # 画像の場所 train_dir = './data/train' test_dir = './data/test' # データの前処理を定義 transform = transforms.Compose( [ # 画像のリサイズ transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), # tensorに変換 transforms.ToTensor(), # 正規化, 計算した値を入れる transforms.Normalize(mean=[0.4883, 0.4551, 0.4170], std=[0.2257, 0.2211, 0.2214]) ] ) # 訓練データを取得 train_dataset = DogCatDataset( csv_file="./data/train.csv", root_dir=train_dir, # 画像を保存したディレクトリ(適宜書き換えて) transform=transform ) # train val 分割 n_samples = len(train_dataset) # n_samples is 25000 train_size = int(len(train_dataset) * 0.8) # train_size is 20000 val_size = n_samples - train_size # val_size is 5000 # shuffleしてから分割してくれる. train_dataset, val_dataset = torch.utils.data.random_split( train_dataset, [train_size, val_size]) datasets = {'train': train_dataset, 'val': val_dataset} # Create training and validation dataloaders dataloaders = { x: torch.utils.data.DataLoader( datasets[x], batch_size=BATCH_SIZE, shuffle=True, pin_memory=True, drop_last=False if x == 'val' else True, num_workers=mp.cpu_count()) for x in ['train', 'val'] } # 転移学習する model = models.vgg16(pretrained=True, progress=True) # 最終層を2クラスに書き換え(ImageNetは1000クラス) model.classifier[6] = nn.Linear(4096, NUM_CLASSES) # optimizerはSGD optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) # ロス関数 criterion = nn.CrossEntropyLoss() # Prepare everything # There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the # prepare method. model, optimizer, dataloaders['train'], dataloaders['val'] = accelerator.prepare( model, optimizer, dataloaders['train'], dataloaders['val']) # 学習スタート # https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html since = time.time() history = {'accuracy': [], 'val_accuracy': [], 'loss': [], 'val_loss': []} best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 for epoch in range(NUM_EPOCH): # Use accelerator.print to print only on the main process. accelerator.print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1)) accelerator.print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'val']: if phase == 'train': model.train() # Set model to training mode else: model.eval() # Set model to evaluate mode running_loss = 0.0 running_corrects = 0 # Iterate over data. for inputs, labels in tqdm(dataloaders[phase]): # zero the parameter gradients optimizer.zero_grad() # forward # track history if only in train with torch.set_grad_enabled(phase == 'train'): # Get model outputs and calculate loss outputs = model(inputs) loss = criterion(outputs, labels) _, preds = torch.max(outputs, 1) # backward + optimize only if in training phase if phase == 'train': accelerator.backward(loss) optimizer.step() # statistics running_loss += loss * inputs.size(0) running_corrects += torch.sum(preds == labels.data) all_running_loss = accelerator.gather(running_loss) all_running_corrects = accelerator.gather(running_corrects) if accelerator.is_local_main_process: epoch_loss = all_running_loss.sum().item() / len(dataloaders[phase].dataset) epoch_acc = all_running_corrects.sum().double() / len(dataloaders[phase].dataset) print( '{} Loss: {:.4f} Acc: {:.4f}'.format( phase, epoch_loss, epoch_acc)) # deep copy the model if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc unwrapped_model = accelerator.unwrap_model(model) best_model_wts = copy.deepcopy(unwrapped_model.state_dict()) if phase == 'train': history['accuracy'].append(epoch_acc.item()) history['loss'].append(epoch_loss) else: history['val_accuracy'].append(epoch_acc.item()) history['val_loss'].append(epoch_loss) print() if accelerator.is_local_main_process: time_elapsed = time.time() - since print( 'Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best val Acc: {:4f}'.format(best_acc)) # ロードのときにだるいのでcpuに変更して保存 torch.save(best_model_wts, './model/best.pth') # plot acc = history['accuracy'] val_acc = history['val_accuracy'] loss = history['loss'] val_loss = history['val_loss'] epochs_range = range(NUM_EPOCH) plt.figure(figsize=(24, 8)) plt.subplot(1, 2, 1) plt.plot(epochs_range, acc, label='Training Accuracy') plt.plot(epochs_range, val_acc, label='Validation Accuracy') plt.legend(loc='lower right') plt.title('Training and Validation Accuracy') plt.subplot(1, 2, 2) plt.plot(epochs_range, loss, label='Training Loss') plt.plot(epochs_range, val_loss, label='Validation Loss') plt.legend(loc='upper right') plt.title('Training and Validation Loss') plt.savefig("training_results.png")
実行コマンド
- シングルノード、4GPUの場合
まずaccelerate config
コマンドでいろいろ設定します
$ accelerate config In which compute environment are you running? ([0] This machine, [1] AWS (Amazon SageMaker)): 0 Which type of machine are you using? ([0] No distributed training, [1] multi-CPU, [2] multi-GPU, [3] TPU): 2 How many different machines will you use (use more than 1 for multi-node training)? [1]:1 Do you want to use DeepSpeed? [yes/NO]: NO How many processes in total will you use? [1]:4 Do you wish to use FP16 (mixed precision)? [yes/NO]: NO
すると勝手に設定してくれるらしいので後は以下のaccelerate launch
で実行です。
accelerate launch train.py
DDP(accelerate) ソース説明
公式ドキュメントに従っていろいろ書き換えます。
まずデバイスを以下のように指定します。
# デバイスの指定
accelerator = Accelerator()
device = accelerator.device
prepare関数にmodel, optimizer, dataloaderを通します。
model, optimizer, dataloaders['train'], dataloaders['val'] = accelerator.prepare( model, optimizer, dataloaders['train'], dataloaders['val'])
評価値など、他のノードから情報を集約する場合(評価のときとか)は、以下のようにgather
関数を使います。
all_running_loss = accelerator.gather(running_loss) all_running_corrects = accelerator.gather(running_corrects)
モデルのセーブは以下のように行います
accelerator.wait_for_everyone() unwrapped_model = accelerator.unwrap_model(model) accelerator.save(unwrapped_model.state_dict(), filename)
今回はstate_dictだけ置いておきたかったので、unwrap_model関数のみ使用しました。
詳しくは以下のQuick tourに詳細があります。
以下はメインプロセスのみで行いたい場合の処理です。
if accelerator.is_local_main_process:
時間比較
きちんとした比較じゃないので参考程度に。
- GPU: 4枚
- 1GPUあたりのbatch: 50
- 画像が25000枚なので、端数出さないように10の倍数
- Epoch: 1
- torchvisionのVGG16の事前学習済モデルをfine-tuning
cuda:0
- 4m 35s
train Loss: 0.0136 Acc: 0.9951 val Loss: 0.0240 Acc: 0.9912 Training complete in 4m 35s
nn.DataParallel
- 1m 39s
train Loss: 0.0459 Acc: 0.9817 val Loss: 0.0235 Acc: 0.9908 Training complete in 1m 39s
DDP
- 0m 43s
train Loss: 0.0735 Acc: 0.9691 val Loss: 0.0267 Acc: 0.9914 Training complete in 0m 43s
DDP (accelerate)
- 0m 42s
train Loss: 0.0750 Acc: 0.9682 val Loss: 0.0265 Acc: 0.9908 Training complete in 0m 42s
DDPが一番早いですね。
今までDDPの実装はかなり書き換えが必要で大変でしたが、accelerateを使うと手軽にDDPの実装が可能です。 ぜひ使って見て下さい。
ただ、train lossとtrain accuracyの計測方法がDDPの場合怪しいので、すこし数値がずれています。
最後に
イマイチ理解しておらず、何となくで書いてみました。 シードを固定しているのに値が少し違うのが気になりますが、非同期に動作してるためでしょう。 (実装が間違ってるのかも)
以下のチュートリアルに準拠しているきれいなMNISTソースもあるので、そちらも参考にしてみて下さい。