rossiXYZ

[源码解析] PyTorch 分布式(5) ------ DistributedDataParallel 总述&如何使用

0x00 摘要

本文是 PyTorch 分布式系列的第五篇,以几篇官方文档的翻译为基础,加入了自己的一些思考,带领大家进入DistributedDataParallel,在后续会用5~6篇左右做深入分析。

本系列其他文章如下:

深度学习利器之自动微分(1)

深度学习利器之自动微分(2)

[源码解析]深度学习利器之自动微分(3) --- 示例解读

[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

[源码解析] PyTorch如何实现前向传播(3) --- 具体实现

[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

[源码解析] PyTorch 分布式(1)------历史和概述

[源码解析] PyTorch 分布式(2) ----- DataParallel(上)

[源码解析] PyTorch 分布式(3) ----- DataParallel(下)

[源码解析] PyTorch 分布式(4)------分布式应用基础概念

0x01 数据并行

因为DistributedDataParallel 是数据并行,所以我们首先通过两个图,复习一下什么是数据并行。

第一个图片来自 https://www.cnblogs.com/yh-blog/p/12877922.html,其原始出处未知。

我们可以看到,模型并行与数据并行的区别。

第二张图来自fairscale github源码,清晰的给出了一个数据并行的运行模式,具体包括:

模型分片,本地前向计算,本地反向传播,AllReduce来同步梯度,本地更新梯度这几步。

0x02 DDP 运行逻辑

Torch.distributed 包 为多个计算节点的 PyTorch 提供多进程并行通信原语,可以并行化跨进程和跨集群的计算。torch.nn.parallel.DistributedDataParallel基于torch.distributed 包的功能提供了一个同步分布式训练wrapper,这个wrapper可以对 PyTorch 模型封装进行训练。其核心功能是基于多进程级别的通信,与Multiprocessing package - torch.multiprocessing 和 DataParrallel 提供的并行性有明显区别。

以下是 DDP 的整体架构,大家可以看到ddp在整个架构之中的位置,依赖项等等。图片来自来自源码。

我们通过一个图来说明 DDP 的运行逻辑。

图片来自 https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/

具体逻辑如下:

  1. 加载模型阶段。每个GPU都拥有模型的一个副本,所以不需要拷贝模型。rank为0的进程会将网络初始化参数broadcast到其它每个进程中,确保每个进程中的模型都拥有一样的初始化值。
  2. 加载数据阶段。DDP 不需要广播数据,而是使用多进程并行加载数据。在 host 之上,每个worker进程都会把自己负责的数据从硬盘加载到 page-locked memory。DistributedSampler 保证每个进程加载到的数据是彼此不重叠的。
  3. 前向传播阶段。在每个GPU之上运行前向传播,计算输出。每个GPU都执行同样的训练,所以不需要有主 GPU。
  4. 计算损失。在每个GPU之上计算损失。
  5. 反向传播阶段。运行后向传播来计算梯度,在计算梯度同时也对梯度执行all-reduce操作。
  6. 更新模型参数阶段。因为每个GPU都从完全相同的模型开始训练,并且梯度被all-reduced,因此每个GPU在反向传播结束时最终得到平均梯度的相同副本,所有GPU上的权重更新都相同,也就不需要模型同步了。注意,在每次迭代中,模型中的Buffers 需要从rank为0的进程广播到进程组的其它进程上。

0x03 VS DataParallel

3.1 本质区别

既然 DataParallel 可以进行数据并行训练,那么为什么还需要提出 DistributedDataParallel呢?这里我们就需要知道两种方法的实现原理与区别:

  • 大型模型训练。

    • 如果模型太大而无法容纳在单个 GPU 上,则必须使用模型并行将其拆分到多个 GPU 中。
      • DataParallel 因为必须将模型放入单块 GPU 中,所以难以完成大型模型的训练,即,无法和模型并行(跨多个 GPU 拆分单个模型)一起合作。
      • DistributedDataParallel 可以只包括大型模型的一部分,因此可以与模型并行一起合作。
    • 如果数据太大而无法容纳在一台计算机上,则需要使用数据并行。
      • 在这种情况下,每个 DistributedDataParallel 进程都可以并行使用模型,而所有进程都将并行使用数据。此时与 DP 没有太大区别。
    • 如果您的模型需要跨越多台机器,或者您的用例不适合数据并行性范式,请参阅 RPC API ,以获得更多通用的分布式训练支持。
  • 多进程还是多线程:

    • DataParallel 是单进程,多线程的并行训练方式,并且只能在单台机器上运行。
    • 而DistributedDataParallel 是多进程,并且适用于单机和多机训练。DistributedDataParallel 还预先复制模型,而不是在每次迭代时复制模型,并避免了全局解释器锁定。
      • 每个进程维护自己的优化器,并且在每次迭代中执行一个完整的优化步骤。由于梯度已经聚合(gather)并跨进程平均,因此梯度对于每个进程都是相同的,这就不需要广播参数步骤,因此减少了在节点之间传输张量的时间。
      • 每个进程包含一个独立的 Python 解释器,因而消除了单个 Python 进程驱动多个执行线程、模型副本或者 GPU 的额外解释器开销和"GIL 颠簸"(GIL-thrashing)。对于严重依赖 Python 运行时的模型(比如说包含 RNN 层或大量小组件的 models )这尤其重要。
    • 即使在单台机器上,DataParallel通常也比DistributedDataParallel慢,这是因为跨线程的 GIL 争用,每次迭代复制的模型以及分散输入和收集输出所带来的额外开销。

3.2 实现区别

DDP 与DP在具体实现上的区别如下:

  • 关于优化器:
    • DDP :在每次迭代之中,DDP 的每个进程都有自己的 optimizer ,每个进程都独立完成所有优化步骤,这和非分布式训练一样。
    • DP :在 DP 中只有一个 optimizer,在主线程执行。其对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU
  • 关于梯度。
    • DDP :每个进程在自己 GPU之上计算损失,运行后向传播来计算梯度,在计算梯度同时对梯度执行all-reduce操作。
    • DP :在各进程梯度计算完成之后,各进程需要将梯度进行汇总规约到主进程,主进程用梯度来更新模型权重,然后其 broadcast 模型到所有进程(其他GPU)进行下一步训练。
  • 关于传播数据:
    • DDP :只对梯度等少量数据进行交换。由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。相较于 DataParallel来说,torch.distributed 传输的数据量更少,因此速度更快,效率更高。
    • DP :每次迭代,有大量交互,比如模型,前向输出,损失,梯度等。

0x04 使用

Pytorch 中分布式的基本使用流程如下:

  1. 首先需要使用 init_process_group 初始化进程组,同时初始化 distributed 包,然后才能使用 distributed 包的其他函数。
  2. 如果需要进行组内集体通信,用 new_group 创建子分组。
  3. 使用 DDP(model, device_ids=device_ids) 创建 DistributedDataParalle 模型。
  4. 为数据集创建分布式 Sampler
  5. 使用启动工具 torch.distributed.launch 在每个主机上执行脚本,开始训练。
  6. 使用 destory_process_group() 销毁进程组。

4.1 基本示例

首先,我们使用 https://pytorch.org/tutorials/intermediate/ddp_tutorial.html 来看看。

4.1.1 设置进程组

在示例的最开始,我们首先要正确设置进程组。

init_process_group 的参数解释如下:

  • "gloo" 说明后端使用 "gloo"。
  • rank 是本进程对应的rank,如果是0,则说明本进程是 master 进程,负责广播模型状态等工作。
  • world_size 指的是总的并行进程数目,如果连接的进程数小于world_size,进程就会阻塞在 init_process_group之上,如果达到了 world_size,程序才会继续运行。如果 batch_size = 16,那么总体的batch size 就是 16 * world_size。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size) # 这条命令之后,master进程就处于等待状态

def cleanup():
    dist.destroy_process_group()

4.1.2 简单模型

现在,让我们创建一个简单模块,用 DDP 包装它,并用一些虚拟输入数据馈送它。请注意,由于 DDP 将模型状态从 rank 0 进程广播到 DDP 构造函数中的所有其他进程,因此对于所有 DDP 进程来说,它们的起始模型参数是一样的,用户无需担心不同的 DDP 进程从不同的模型参数初始值开始。

                         +-----------+
                         |           |
                         |  Rank 0   |
                         |           |
                         +-----+-----+
                               |
                               |  Model Parameters
                               |
                               |
     +---------------+---------v----------------------+
     |               |                                |
     |               |                                |
     |               |                                |
     |               |                                |
     v               v                                v
+----+-----+    +----+-----+                      +---+-------+
|          |    |          |                      |           |
|  Rank 1  |    |  Rank 2  |    ......            |  Rank n   |
|          |    |          |                      |           |
+----------+    +----------+                      +-----------+

DDP 包装了较低级别的分布式通信细节,并提供了一个干净的 API,就好像它是一个本地模型一样。梯度同步通信发生在反向传播期间,并与反向计算重叠。当backward()返回时,param.grad已经包含同步梯度张量。因为DDP 封装了分布式通信原语,所以模型参数的梯度可以进行 all-reduce。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

具体如下图

+--------------------------+                   +------------------------+
| torch.optim.SGD          |                   | DDP                    |
|                          |    parameters()   |                        |
|                          |                   |      +------------+    |
|                          | <-----------------+      |            |    |
|                          |                   |      |  ToyModel  |    |
|                          |                   |      |            |    |
|                          |                   |      +------------+    |
|                          |                   |                        |
+--------------------------+                   +--------+---------------+
                                                        |
                                                        |
                                                        |  forward outputs
                                                        |
                                                        |
                                                        v

                                               +-------------------------+
                                               | nn.MSELoss()            |
                                               |                         |
                                               |                         |
                                               |                         |
                                               |                         |
                                               +-------------------------+

4.1.3 处理速度偏差

在 DDP 中,构造函数、前向传递和后向传递是分布式同步点。我们期望不同的进程会启动相同数量的同步操作,并在大致相同的时间以相同的顺序到达这些同步点。否则,进度快的进程可能会提前到达同步点,如果快进程等待落后者的时间过长,那么先到的进程会超时。

因此,用户需要负责平衡进程间的工作负载分布。有时,由于网络延迟,资源争用,不可预测的工作负载峰值等原因,处理速度的偏差是不可避免的。为避免在这些情况下超时,请确保在调用 init_process_group 时。timeout这个参数传递足够大的值 。

4.1.4 保存和加载检查点

一般来说,用户可以使用torch.savetorch.load作为checkpoints,以便从检查点恢复训练。

在使用 DDP 时,一种优化是只在一个进程中保存模型,然后在所有进程中加载模型,从而减少写入开销(这其实很像数据库中的读写分离)。因为所有进程都从相同的参数开始,并且在反向传递中同步梯度,所以优化器应该将参数设置为相同的值。如果使用此优化,请确保在保存完成之前所有进程都不会开始加载。

此外,在加载模块时,您需要提供适当的map_location 参数,以防止一个进程进入他人的设备。如果map_location 缺失,torch.load将首先将模块加载到 CPU,然后将每个参数复制到它之前保存的地方,这将导致同一台机器上的所有进程使用相同的一组设备。

有关更高级的故障恢复和弹性支持,请参阅TorchElastic后续也会有专门系列介绍弹性部分

从下图可以看出来,Rank 0 负责保存模型到存储之上,其他 Rank 会加载模型到其本地。

                   +-----------+
                   |           |
                   |  Rank 0   |
                   |           |
                   +-----+-----+
                         |
                    save |  Model Parameters
                         |
                         |
                         v
                 +-------+------+
                 |              |
     +-----------+  Model file  +---------------------+
     |           |              |                     |
     |           +---+----------+                     |
     |               |                                |
     |               |                                |
     |               |                                |
     |               |                                |
     |load           |load                      load  |
     |               |                                |
     |               |                                |
     |               |                                |
     |               |                                |
     v               v                                v
+----+-----+    +----+-----+                      +---+-------+
|          |    |          |                      |           |
|  Rank 1  |    |  Rank 2  |    ......            |  Rank n   |
|          |    |          |                      |           |
+----------+    +----------+                      +-----------+

具体如下:

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

4.2 将 DDP 与模型并行相结合

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html 后半部分是与模型并行的结合,我们一起来看看。

DDP 也适用于多 GPU 模型。DDP 在使用大数据训练大模型时候特别有用。

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

注意,当把一个多GPU 模型传递给DDP时候,不能设置device_idsoutput_device

输入和输出数据将通过应用程序或模型forward()方法来放置在适当的设备中。

def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = (rank * 2) % world_size
    dev1 = (rank * 2 + 1) % world_size
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)
    run_demo(demo_checkpoint, world_size)
    run_demo(demo_model_parallel, world_size)

请注意,这里没有使用 Sampler,正常在使用之中,需要用DistributedSampler来配合 DDP 使用,DistributedSampler 会把数据集样本针对每个进程来划分,这样每个进程就读取到了自己应该使用的样本,而且 DistributedSampler 会为 DDP 模式使用 set_epoch 来shuffle数据集。

0x05 如何多进程启动

前面提到,如果应用程序需要跨机器边界进行扩展,需要使用多机 DistributedDataParallel 和 启动脚本。torch.nn.parallel.DistributedDataParallel() 支持多个通过网络互联的机器,用户必须为每个进程显式启动一个主训练脚本。

我们下面就看看这个启动脚本 https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md。以下就是这个md文件的翻译。

在本教程中,我们将演示如何构建分布式模型训练应用程序,这样它可以在多个节点上方便地启动。这里每个节点都有多个 GPU,并且使用 PyTorch 的分布式启动程序脚本 https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py 启动实用程序torch.distributed.launch,此脚本程序可用于为每个节点启动多个进程以进行分布式训练,它在每个训练节点上产生多个分布式训练进程。

这个工具可以用作CPU训练或者GPU 训练,如果被用于GPU,每个GPU产生一个进程Process。该工具既可以用来做单节点多GPU训练,也可用于多节点多GPU训练。

  • 如果是单节点多GPU,将会在单个GPU上运行一个分布式进程,据称可以非常好地改进单节点训练性能。
  • 如果用于多节点分布式训练,则通过在每个节点上产生多个进程来获得更好的多节点分布式训练性能。如果有Infiniband接口则加速比会更高。

在 单节点分布式训练 或 多节点分布式训练 的两种情况下,该工具将为每个节点启动给定数量的进程(--nproc_per_node)。如果用于GPU培训,则此数字需要小于或等于当前系统上的GPU数量(nproc_per_node),每个进程将在从GPU 0到GPU(nproc_per_node - 1)的单个GPU上运行。

5.1 先决条件

多个worker通过处理大型数据集的不同部分来训练同一个全局模型,每个worker将独立计算局部梯度(也称为梯度 sub-gradients),然后使用 AllReduce 原语来同步梯度。因为同一个程序在所有应用上运行,但每个应用都在训练数据集的不同部分上运行,所以在 HPC 术语中,这种执行模型称为单程序多数据或 SPMD,

5.2 应用进程拓扑

一个分布式数据并行 (DDP) 应用程序可以在多个节点上执行,其中每个节点可以由多个 GPU 设备组成。每个节点依次可以运行 DDP 应用程序的多个副本,每个副本在多个 GPU 上处理其模型。

N为运行应用程序的节点数, G为每个节点的 GPU 数。同时在所有节点上运行的应用程序进程总数称为 World Size,简写为W。在每个节点上运行的进程数称为Local World Size,简写为L

每个应用进程都分配了两个 ID:local rank 取值在 [0, L -1] 中,global rank 取值在 [0, W -1] 之中。

为了阐明上面定义的术语,我们考虑在两个节点上启动 DDP 应用程序的情况,每个节点都有四个 GPU。然后我们希望每个进程跨越(span)两个 GPU。进程到节点的映射如下图所示:

下面图片也出自于 https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md。

虽然有很多方法可以将进程映射到节点,但一个好的经验法则是让一个进程跨越(span)单个 GPU。这使得 DDP 应用程序能够拥有与 GPU 一样多的并行读取流,并且在现实中也提供了 I/O 和计算成本之间的良好平衡。

5.3 准备和启动 DDP 应用程序

无论 DDP 应用程序采用何种启动方式,每个进程都需要一种机制来了解其全局和本地等级。所以,所有进程会创建一个ProcessGroup,基于ProcessGroup可以使它们能够参与诸如 AllReduce 之类的集合通信操作。

有一种便捷的方法可以启动多个 DDP 进程,并且可以初始化所有参数(这些数值是建立一个ProcessGroup 所需要的),这就是使用PyTorch 提供的分布式 脚本launch.py

这个 Launcher 可以在本地torch 安装目录的distributed子目录下找到。这是在任何操作系统上获取launch.py路径的快捷方法 :

python -c " from os import path; import torch; print(path.join(path.dirname(torch.__file__), 'distributed', 'launch.py')) "

这将打印如下内容:

/home/username/miniconda3/envs/pytorch/lib/python3.8/site-packages/torch/distributed/launch.py

当 DDP 应用程序通过 launch.py启动时,它通过环境变量将 world size、 global rank、local rank,master address 和端口作为命令行参数传递给每个实例。要使用 Launcher,应用程序需要遵守以下约定:

  • 必须为单个 worker提供入口点函数。例如,它不应该使用torch.multiprocessing.spawn启动子进程。
  • 必须使用环境变量来初始化进程组。

为简单起见,应用程序可以假设每个进程映射到单个 GPU,但在下一节中,我们还将展示如何用更通用的办法来执行进程到 GPU 的映射。

5.4 示例应用

此示例 DDP 应用程序基于 DDP 教程 的 “Hello, World” 应用。

5.4.1 参数传递约定

DDP 应用程序采用两个命令行参数:

  1. --local_rank: 此参数将通过 launch.py传入。
  2. --local_world_size:这是明确传递的,通常是数字 \(1\) 或每个节点的 GPU 数量。

应用程序解析这些并调用spmd_main入口点:

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=0)
    parser.add_argument("--local_world_size", type=int, default=1)
    args = parser.parse_args()
    spmd_main(args.local_world_size, args.local_rank)

spmd_main之中,进程组使用后端(NCCL 或 Gloo)进行初始化。集合点(rendezvous )所需的其余信息来自launch.py设置的环境变量:

def spmd_main(local_world_size, local_rank):
    # These are the parameters used to initialize the process group
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
    }
    print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
    dist.init_process_group(backend="nccl")
    print(
        f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
        + f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
    )

    demo_basic(local_world_size, local_rank)

    # Tear down the process group
    dist.destroy_process_group()

给定 local rank 和 world size,训练函数demo_basic将通过device_ids在本地节点的一组 GPU 上初始化DistributedDataParallel模型:

def demo_basic(local_world_size, local_rank):

    # setup devices for this process. For local_world_size = 2, num_gpus = 8,
    # rank 0 uses GPUs [0, 1, 2, 3] and
    # rank 1 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // local_world_size
    device_ids = list(range(local_rank * n, (local_rank + 1) * n))

    print(
        f"[{os.getpid()}] rank = {dist.get_rank()}, "
        + f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}"
    )

    model = ToyModel().cuda(device_ids[0])
    ddp_model = DDP(model, device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn(outputs, labels).backward()
    optimizer.step()

该应用程序可以通过launch.py以下方式在一个 8 GPU 的节点上启动,每个 GPU 一个进程:

python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=8 example.py --local_world_size=8

并产生类似于下图所示的输出:

*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
[238627] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '8'}
[238630] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '3', 'WORLD_SIZE': '8'}
[238628] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '1', 'WORLD_SIZE': '8'}
[238634] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'}
[238631] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'}
[238632] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '5', 'WORLD_SIZE': '8'}
[238629] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '2', 'WORLD_SIZE': '8'}
[238633] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'}
[238633] world_size = 8, rank = 6, backend=nccl
[238628] world_size = 8, rank = 1, backend=nccl
[238629] world_size = 8, rank = 2, backend=nccl
[238631] world_size = 8, rank = 4, backend=nccl
[238630] world_size = 8, rank = 3, backend=nccl
[238632] world_size = 8, rank = 5, backend=nccl
[238634] world_size = 8, rank = 7, backend=nccl
[238627] world_size = 8, rank = 0, backend=nccl
[238633] rank = 6, world_size = 8, n = 1, device_ids = [6]
[238628] rank = 1, world_size = 8, n = 1, device_ids = [1]
[238632] rank = 5, world_size = 8, n = 1, device_ids = [5]
[238634] rank = 7, world_size = 8, n = 1, device_ids = [7]
[238629] rank = 2, world_size = 8, n = 1, device_ids = [2]
[238630] rank = 3, world_size = 8, n = 1, device_ids = [3]
[238631] rank = 4, world_size = 8, n = 1, device_ids = [4]
[238627] rank = 0, world_size = 8, n = 1, device_ids = [0]

同样,它可以使用一个跨越(span)所有 8 个 GPU 的单进程来启动:

python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=1 example.py --local_world_size=1

为当前主机创建 nproc_per_node 个进程,每个进程独立执行训练脚本,同时还为每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。

比如 node_rank = 2, local_rank = 0,表示 node_rank 第2个节点,上第一个进程。

依次产生以下输出

[262816] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '1'}
[262816]: world_size = 1, rank = 0, backend=nccl
[262816] rank = 0, world_size = 1, n = 8, device_ids = [0, 1, 2, 3, 4, 5, 6, 7]

5.5 结论

作为分布式数据并行应用程序的作者,您的代码需要了解两种类型的资源:计算节点和每个节点内的 GPU。但是需要跟踪GPU集如何映射到应用程序进程,这个簿记(bookkeeping )工作可能既乏味又容易出错。

所以我们希望通过按照本示例所示的方法,使用 launcher 来构建您的应用程序,这样可以显著简化分布式训练的设置。

5.6 启动脚本的背后

知道了启动脚本的作用依然不够,我们还需要知道其内部做了什么。

5.6.1 launch.py

launch.py 位于 torch/distributed/launch.py,但是实际上,它的大部分功能都被转移到了 torch/distributed/run.py 之中。

def main(args=None):
    logger.warn(
        "The module torch.distributed.launch is deprecated "
        "and going to be removed in future."
        "Migrate to torch.distributed.run"
    )
    args = parse_args(args)
    run(args)

所以我们要看看 run.py。

5.6.2 run.py

可以看到,run.py 的基本思路就是:使用 config_from_args 来从命令行之中提取信息,构建了对应的配置,执行语句和其参数,然后调用 elastic_launch 来执行。由此可见,弹性训练是未来趋势。我们后续也有系列来分析弹性训练

def run(args):
    if args.standalone:
        args.rdzv_backend = "c10d"
        args.rdzv_endpoint = "localhost:29400"
        args.rdzv_id = str(uuid.uuid4())
        log.info(
            f"\n**************************************\n"
            f"Rendezvous info:\n"
            f"--rdzv_backend={args.rdzv_backend} "
            f"--rdzv_endpoint={args.rdzv_endpoint} "
            f"--rdzv_id={args.rdzv_id}\n"
            f"**************************************\n"
        )

    config, cmd, cmd_args = config_from_args(args)
    elastic_launch(
        config=config,
        entrypoint=cmd,
    )(*cmd_args)

run.py 也可以独立运行,比如。

>>> python -m torch.distributed.run
    --nnodes=$NUM_NODES
    --nproc_per_node=$NUM_TRAINERS
    --rdzv_id=$JOB_ID
    --rdzv_backend=c10d
    --rdzv_endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

5.6.3 定义

因为run.py 有很多配置参数,所以我们大致看一下。

  1. Node - 物理实例或容器;映射到与 job manager 所协调的单元。

  2. Worker - 分布式培训环境中的worker。

  3. WorkerGroup - 执行相同功能的一组worker(例如trainer)。

  4. LocalWorkerGroup - 在同一节点上运行的工作组中的workers子集。

  5. RANK - 工作组中worker的rank,是全局rank,可以认为是一个全局GPU资源列表。

  6. LOCAL_RANK - 本地工作组中,某个worker 的 rank,可以认为是当前节点上的GPU资源列表。

  7. GROUP_RANK - worker group的rank。介于0和“最大节点数”之间的数字。如果每个节点运行一个单一工作组,那就是这个节点的rank。

  8. ROLE_RANK - 对于具有相同角色worker来说,他们之间共享的rank,角色在“WorkerSpec”中被指定。

  9. WORLD_SIZE - 工作组中worker的总数。因为节点会加入/离开,所以WORLD_SIZE会变化,不能依赖 WORLD_SIZE的稳定性进行编码。

  10. LOCAL_WORLD_SIZE - 本地工作组的大小,即本地运行的worker数目,等于在torch.distributed.run运行时候指定的--nproc_per_node。目前,torch/distributed/run.py 仅支持同构的 LOCAL_WORLD_SIZE。也就是说,假设所有节点运行相同数量的本地工作者(每个角色)。

  11. ROLE_WORLD_SIZE - 具有同样角色的workers总数,在 WorkerSpec之中被指定。

  12. rdzv_id - 用户定义的id,用于唯一标识作业的工作组。这个id在每个节点加入特定工作组时候使用。

  13. rdzv_backend-rendezvous 的后端(例如“c10d”)。这通常是一个强一致性的键值存储。

  14. rdzv_endpoint - rendezvous 后端端点;通常以“<host>:<port>”的形式出现。

  15. run_id: 用户定义的id,它唯一地标识分布式应用程序的一个实例。它通常映射到作业id并用于

    允许节点加入正确的分布式应用程序。

  16. TORCHELASTIC_RESTART_COUNT - 迄今为止,工作组重启的次数。

  17. TORCHELASTIC_MAX_RESTARTS - 配置的最大重启数目。

  18. TORCHELASTIC_RUN_ID - 与 rendezvous run_id 相等,即唯一的job id。

我们后面会有专门系列来介绍弹性训练,所以就此略过。下一篇我们开始介绍通信所需要的store概念,敬请期待。

0xFF 参考

https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

相关文章:

猜你喜欢