情報系院生のノート

情報技術系全般,自分用メモを公開してます。

Pytorch Distributed Data Parallel(DDP) 実装例

はじめに

DataParallelといえばnn.DataParallel()でモデルを包んであげるだけで実現できますが、PythonのGILがボトルネックとなり、最大限リソースを活用できません。

最近では、PytorchもDDPを推奨しています。が、ソースの変更点が多く、コーディングの難易度が上がっています。どっちもどっちですね…。

今回はkaggleにある犬と猫の画像データセットを使って、画像の分類問題を例に、DDPを使って見たいと思います。

Dogs vs. Cats Redux: Kernels Edition | Kaggle

Data -> Download All からダウンロード

学習コード

モデルはtorchivisionのVGG16を使いました。 ここのtrain_model関数をベースに、DDP向けに改造します。

Finetuning Torchvision Models — PyTorch Tutorials 1.2.0 documentation

  • DDPはこれを参考にしました。

https://gist.github.com/sgraaf/5b0caa3a320f28c27c12b5efeb35aa4c

  • Dataloaderはこのカーネルを参考にしました。

https://www.kaggle.com/alpaca0984/dog-vs-cat-with-pytorch#Generate-submittion.csv

nn.DataParallelのソース

import time
import copy
from tqdm import tqdm
import multiprocessing as mp
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torchvision.models as models
from torchvision import datasets
import matplotlib.pyplot as plt

from src.datasets import DogCatDataset

# 設定
IMAGE_SIZE = 224
NUM_CLASSES = 2
BATCH_SIZE = 100
NUM_EPOCH = 1

# シード固定
torch.manual_seed(42)
np.random.seed(42)

# GPUが使えたらGPUにする
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 画像の場所
train_dir = './data/train'
test_dir = './data/test'

# データの前処理を定義
transform = transforms.Compose(
    [
        # 画像のリサイズ
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        # tensorに変換
        transforms.ToTensor(),
        #  正規化, 計算した値を入れる
        transforms.Normalize(mean=[0.4883, 0.4551, 0.4170],
                             std=[0.2257, 0.2211, 0.2214])
    ]
)


# 訓練データを取得
train_dataset = DogCatDataset(
    csv_file="./data/train.csv",
    root_dir=train_dir,  # 画像を保存したディレクトリ(適宜書き換えて)
    transform=transform
)

# train val 分割
n_samples = len(train_dataset)  # n_samples is 60000
train_size = int(len(train_dataset) * 0.8)  # train_size is 48000
val_size = n_samples - train_size  # val_size is 48000

# shuffleしてから分割してくれる.
train_dataset, val_dataset = torch.utils.data.random_split(
    train_dataset, [train_size, val_size])
datasets = {'train': train_dataset, 'val': val_dataset}

# Create training and validation dataloaders
dataloaders = {
    x: torch.utils.data.DataLoader(
        datasets[x],
        batch_size=BATCH_SIZE,
        shuffle=True,
        pin_memory=True,
        num_workers=mp.cpu_count()) for x in ['train', 'val']
}


# 転移学習する
model = models.vgg16(pretrained=True, progress=True)
# 最終層を2クラスに書き換え(ImageNetは1000クラス)
model.classifier[6] = nn.Linear(4096, NUM_CLASSES)
# GPUに転送
model = model.to(device)
# # GPU 4つ使えるようにする
# model = nn.DataParallel(model)

# optimizerはSGD
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# ロス関数
criterion = nn.CrossEntropyLoss()

# 学習スタート
# https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
since = time.time()
history = {'accuracy': [],
            'val_accuracy': [],
            'loss': [],
            'val_loss': []}


best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

for epoch in range(NUM_EPOCH):
    print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1))
    print('-' * 10)

    # Each epoch has a training and validation phase
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()  # Set model to training mode
        else:
            model.eval()   # Set model to evaluate mode

        running_loss = 0.0
        running_corrects = 0

        # Iterate over data.
        for inputs, labels in tqdm(dataloaders[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(phase == 'train'):
                # Get model outputs and calculate loss
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)

                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            # statistics
            running_loss += loss * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss.item() / len(dataloaders[phase].dataset)
        epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

        print(
            '{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase,
                epoch_loss,
                epoch_acc))

        # deep copy the model
        if phase == 'val' and epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        if phase == 'train':
            history['accuracy'].append(epoch_acc.item())
            history['loss'].append(epoch_loss)
        else:
            history['val_accuracy'].append(epoch_acc.item())
            history['val_loss'].append(epoch_loss) 

    print()

time_elapsed = time.time() - since
print(
    'Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed //
        60,
        time_elapsed %
        60))
print('Best val Acc: {:4f}'.format(best_acc))

# load best model weights
model.load_state_dict(best_model_wts)

# ロードのときにだるいのでcpuに変更して保存
model = model.to('cpu')
torch.save(model.state_dict(), './model/best.pth')


# plot
acc = history['accuracy']
val_acc = history['val_accuracy']
loss = history['loss']
val_loss = history['val_loss']
epochs_range = range(NUM_EPOCH)

plt.figure(figsize=(24, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.savefig("training_results.png")

DDPのソース

import time
import copy
import random
from tqdm import tqdm
import multiprocessing as mp
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torchvision.models as models
from torchvision import datasets
import matplotlib.pyplot as plt

from src.datasets import DogCatDataset

# DDP
from argparse import ArgumentParser
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

parser = ArgumentParser('DDP usage example')
parser.add_argument('--local_rank', type=int, default=-1, metavar='N', help='Local process rank.')  # you need this argument in your scripts for DDP to work
args = parser.parse_args()

args.is_master = args.local_rank == 0
# init
dist.init_process_group(backend='nccl', init_method='env://')
torch.cuda.set_device(args.local_rank)

# 設定
IMAGE_SIZE = 224
NUM_CLASSES = 2
BATCH_SIZE = 100
NUM_EPOCH = 1

# シード固定
torch.cuda.manual_seed_all(42)
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)


# 画像の場所
train_dir = './data/train'
test_dir = './data/test'

# データの前処理を定義
transform = transforms.Compose(
    [
        # 画像のリサイズ
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        # tensorに変換
        transforms.ToTensor(),
        #  正規化, 計算した値を入れる
        transforms.Normalize(mean=[0.4883, 0.4551, 0.4170],
                             std=[0.2257, 0.2211, 0.2214])
    ]
)


# 訓練データを取得
train_dataset = DogCatDataset(
    csv_file="./data/train.csv",
    root_dir=train_dir,  # 画像を保存したディレクトリ(適宜書き換えて)
    transform=transform
)

# train val 分割
n_samples = len(train_dataset)  # n_samples is 60000
train_size = int(len(train_dataset) * 0.8)  # train_size is 48000
val_size = n_samples - train_size  # val_size is 48000

# shuffleしてから分割してくれる.
train_dataset, val_dataset = torch.utils.data.random_split(
    train_dataset, [train_size, val_size])
datasets = {'train': train_dataset, 'val': val_dataset}

# Create training and validation dataloaders
dataloaders = {
    x: torch.utils.data.DataLoader(
        datasets[x],
        batch_size=BATCH_SIZE,
        pin_memory=True,
        sampler=DistributedSampler(datasets[x], rank=args.local_rank),
        num_workers=mp.cpu_count()) for x in ['train', 'val']
}


# 転移学習する
model = models.vgg16(pretrained=True, progress=True)
# 最終層を2クラスに書き換え(ImageNetは1000クラス)
model.classifier[6] = nn.Linear(4096, NUM_CLASSES)
# GPUに転送
model = model.cuda()
# DDP
model = DDP(
        model,
        device_ids=[args.local_rank]
    )

# optimizerはSGD
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# ロス関数
criterion = nn.CrossEntropyLoss()

# 学習スタート
# https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
since = time.time()
history = {'accuracy': [],
            'val_accuracy': [],
            'loss': [],
            'val_loss': []}


best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

for epoch in range(NUM_EPOCH):
    if args.is_master:
        print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1))
        print('-' * 10)

    # Each epoch has a training and validation phase
    for phase in ['train', 'val']:        
        dist.barrier()  # let all processes sync up before starting with a new epoch of training
        
        if phase == 'train':
            model.train()  # Set model to training mode
        else:
            model.eval()   # Set model to evaluate mode

        running_loss = 0.0
        running_corrects = 0
      
        # Iterate over data.
        for inputs, labels in tqdm(dataloaders[phase]):
            inputs = inputs.cuda()
            labels = labels.cuda()

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(phase == 'train'):
                # Get model outputs and calculate loss
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)

                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            # statistics
            running_loss += loss * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        # 他のノードから集める
        dist.all_reduce(running_loss, op=dist.ReduceOp.SUM)
        dist.all_reduce(running_corrects, op=dist.ReduceOp.SUM)
        
        if args.is_master:
            epoch_loss = running_loss.item() / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print(
                '{} Loss: {:.4f} Acc: {:.4f}'.format(
                    phase,
                    epoch_loss,
                    epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

            if phase == 'train':
                history['accuracy'].append(epoch_acc.item())
                history['loss'].append(epoch_loss)
            else:
                history['val_accuracy'].append(epoch_acc.item())
                history['val_loss'].append(epoch_loss) 

    print()

time_elapsed = time.time() - since
if args.is_master:
    print(
        'Training complete in {:.0f}m {:.0f}s'.format(
            time_elapsed //
            60,
            time_elapsed %
            60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)

    # ロードのときにだるいのでcpuに変更して保存
    model = model.to('cpu')
    torch.save(model.module.state_dict(), './model/best.pth')


    # plot
    acc = history['accuracy']
    val_acc = history['val_accuracy']
    loss = history['loss']
    val_loss = history['val_loss']
    epochs_range = range(NUM_EPOCH)

    plt.figure(figsize=(24, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.savefig("training_results.png")

# destrory all processes
dist.destroy_process_group()

実行コマンド

python -m torch.distributed.launch  --nproc_per_node=4 --nnodes=1 --node_rank 0 train.py

ソース説明

  • ArgumentParser--local_rankというargumentを付与します。このrankがGPUノードの番号を指しており、自動でGPUを割り振ってくれます。
parser = ArgumentParser('DDP usage example')
parser.add_argument('--local_rank', type=int, default=-1, metavar='N', help='Local process rank.')  # you need this argument in your scripts for DDP to work
  • DistributedSamplerを使います。pin_memoryは、データのキャッシュ関係のフラグで少し速くなるようです。
dataloaders = {
    x: torch.utils.data.DataLoader(
        datasets[x],
        batch_size=BATCH_SIZE,
        pin_memory=True,
        sampler=DistributedSampler(datasets[x], rank=args.local_rank),
        num_workers=mp.cpu_count()) for x in ['train', 'val']
}
  • VGG16を使います。最終層を任意のクラス数に書き換えて、fine-tuningを行います。
  • DDPでラップしてあげます。
# 転移学習する
model = models.vgg16(pretrained=True, progress=True)
# 最終層を2クラスに書き換え(ImageNetは1000クラス)
model.classifier[6] = nn.Linear(4096, NUM_CLASSES)
# GPUに転送
model = model.cuda()
# DDP
model = DDP(
        model,
        device_ids=[args.local_rank]
    )
  • エポックが始まる前に、全GPUで同期を取ります。
        dist.barrier()  # let all processes sync up before starting with a new epoch of training
  • lossとaccをそれぞれのGPUで算出して、ここで一箇所に集めます。
        # 他のノードから集める
        dist.all_reduce(running_loss, op=dist.ReduceOp.SUM)
        dist.all_reduce(running_corrects, op=dist.ReduceOp.SUM)
  • モデルの保存、ログ関係はrankが0のGPUでのみ行います。
if args.is_master:

時間比較

きちんとした比較じゃないので参考程度に。

  • GPU: 4枚
  • 1GPUあたりのbatch: 100
    • 画像が25000枚なので、端数出さないように10の倍数
  • Epoch: 1
  • torchvisionのVGG16の事前学習済モデルをfine-tuning

cuda:0

  • 5m 26s
train Loss: 0.0548 Acc: 0.9764
val Loss: 0.0266 Acc: 0.9898

Training complete in 5m 26s

nn.DataParallel

  • 2m 4s
train Loss: 0.0559 Acc: 0.9764
val Loss: 0.0300 Acc: 0.9896

Training complete in 2m 4s

DDP

  • 1m 29s
train Loss: 0.1130 Acc: 0.9507
val Loss: 0.0122 Acc: 0.9882

Training complete in 1m 29s

DDPが一番早いですね。

しかし、思ったよりDataParallelと差が出なかったので、これだったらマルチノードで学習したいとかが無ければ、DataParallelを使ったほうが良いかもしれません。

最後に

イマイチ理解しておらず、何となくで書いてみました。 シードを固定しているのに値が少し違うのが気になりますが、非同期に動作してるためでしょう。 (実装が間違ってるのかも)

以下のチュートリアルに準拠しているきれいなMNISTソースもあるので、そちらも参考にしてみて下さい。

srijithr.gitlab.io