rossiXYZ

[源码解析] PyTorch分布式优化器(3)---- 模型并行

0x00 摘要

本系列介绍分布式优化器,分为三篇文章,分别是基石篇,DP/DDP/Horovod 之中数据并行的优化器,PyTorch 分布式优化器,按照深度递进。本文介绍PyTorch 分布式优化器和PipeDream之中的优化器,主要涉及模型并行(流水线并行)。

PyTorch分布式其他文章如下:

深度学习利器之自动微分(1)

深度学习利器之自动微分(2)

[源码解析]深度学习利器之自动微分(3) --- 示例解读

[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

[源码解析] PyTorch如何实现前向传播(3) --- 具体实现

[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

[源码解析] PyTorch 分布式(1)------历史和概述

[源码解析] PyTorch 分布式(2) ----- DataParallel(上)

[源码解析] PyTorch 分布式(3) ----- DataParallel(下)

[源码解析] PyTorch 分布式(4)------分布式应用基础概念

[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用

[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组

[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇

[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

[源码解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer静态架构

[源码解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 构建Reducer和Join操作

[源码解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向传播

[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

[源码解析] PyTorch 分布式 Autograd (1) ---- 设计

[源码解析] PyTorch 分布式 Autograd (2) ---- RPC基础

[源码解析] PyTorch 分布式 Autograd (3) ---- 上下文相关

[源码解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎

[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

[源码解析] PyTorch 分布式 Autograd (6) ---- 引擎(下)

[源码解析] PyTorch分布式优化器(1)----基石篇

[源码解析] PyTorch分布式优化器(2)----数据并行优化器

为了更好的说明,本文代码会依据具体情况来进行相应精简。

0x01 前文回顾

之前无论是 DP, DDP,或者 Horovod,实质上的都是处理数据并行,比如 DDP 将相同的模型复制到所有 GPU,其中每个 GPU 使用输入数据的不同分区。虽然它可以显着加速训练过程,但它不适用于模型太大而无法放入单个 GPU 的某些用例。于是人们引入了模型并行(model parallel)。

与此对应,优化器也需要做不同的修改以适应模型并行的需求。为了更好的分析,本文首先介绍单机模型并行,然后介绍PyTorch分布式优化器。

0x02 单机模型

下面文字翻译自 https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html ,加入了一些自己的思考和理解。

模型并行被广泛用于分布式训练。与DataParallel相比,模型并行将单个模型拆分到不同的 GPU 上,而不是在每个 GPU 上复制整个模型(具体来说,假设一个模型 m包含 10 层,当使用DataParallel,每个 GPU 将拥有这 10 层的全部副本,而当在两个 GPU 上使用模型并行时,每个 GPU 可以托管 5 层)。

模型并行的高级思想是将模型的不同子网络放置在不同的设备上,并相应地实现该forward方法以便跨设备移动中间输出。由于单个设备上只有模型的一部分在运行,因此一组设备可以共同服务于一个更大的模型。

在这篇文章中,我们不会尝试构建巨大的模型并将它们压缩到有限数量的 GPU 中。相反,这篇文章侧重于展示模型并行的想法。读者可以将这些想法应用到实际应用中。

2.1 基本用法

让我们从一个包含两个线性层的玩具模型开始。要在两个 GPU 上运行这个模型,只需将每个线性层放在不同的 GPU 上,并相应地移动输入和中间输出以匹配层设备。

import torch
import torch.nn as nn
import torch.optim as optim


class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = torch.nn.Linear(10, 10).to(\'cuda:0\')
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(\'cuda:1\')

    def forward(self, x):
        x = self.relu(self.net1(x.to(\'cuda:0\')))
        return self.net2(x.to(\'cuda:1\'))

ToyModel的代码看起来与在单个 GPU 上的实现方式非常相似。只是修改了两个部分:网络构造部分和forward部分。

  • __init__方法使用了两个to(device)语句用来在适当的设备上放置线性层,这样就把整个网络拆分成两个部分,然后就可以分别运行在不同的GPU之上。
  • forward 方法使用了两个to(device)语句用来在适当的设备上放置张量,这样可以把一个layer的输出结果通过tensor.to的语义拷贝到另一个layer所在的GPU上。

这是模型中唯一需要更改的地方。backward()torch.optim会可以应付这种情况,它们自动接管梯度,仿佛模型是一个GPU之上。在调用损失函数时,您只需要确保标签与网络的输出在同一设备上。

model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(\'cuda:1\')
loss_fn(outputs, labels).backward()
optimizer.step()

这里最重要的是 labels = torch.randn(20, 5).to(\'cuda:1\'),这保证了标签在 cuda:1\'。

回忆一下之前forward的代码:self.net2(x.to(\'cuda:1\'))。这两行代码确保标签与输出在同一设备 cuda:1\' 上。

初始化之后如下:

+--------------------+                       +------------------------+
| cuda:0             |                       | cuda:1                 |
|                    |                       |                        |
|                    |                       |                        |
|                    |                       |                        |
|       net1(x)      |                       |        net2(x)         |
|                    |                       |                        |
|                    |                       |                        |
|                    |                       |                        |
+--------------------+                       +------------------------+

forward 操作和设定label之后如下,现在输出和label都在GPU 1 之上:

               +--------------------+                       +------------------------+
               | cuda:0             |                       | cuda:1                 |
               |                    |                       |                        |
               |                    |                       |                        |
               |                    |                       |                        |
x.to(\'cuda:0\')-------> net1(x)  +-------> x.to(\'cuda:1\') +-------->  net2(x)         |
               |                    |                       |                        |
               |                    |                       |   labels.to(\'cuda:1\')  |
               |                    |                       |                        |
               +--------------------+                       +------------------------+

2.2 将模型并行应用到现有模块

还可以通过更改几行代码把一个现有的单 GPU 模块转换到在多个 GPU 上运行。下面的代码展示了如何分解 torchvision.models.resnet50()到两个 GPU之上。基本想法是继承现有ResNet模块,并在构建过程中将层拆分为两个 GPU。然后,重载forward方法以便把两个子网络拼接起来,forward具体是通过相应地移动中间输出来完成。

from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000

class ModelParallelResNet50(ResNet):
    def __init__(self, *args, **kwargs):
        super(ModelParallelResNet50, self).__init__(
            Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,

            self.layer1,
            self.layer2
        ).to(\'cuda:0\')

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to(\'cuda:1\')

        self.fc.to(\'cuda:1\')

    def forward(self, x):
        x = self.seq2(self.seq1(x).to(\'cuda:1\'))
        return self.fc(x.view(x.size(0), -1))

上述实现解决了模型太大而无法放入单个 GPU 的情况下的问题。但是,您可能已经注意到,即使您的模型适合这种情况,它也许会比在单个 GPU 上运行要慢。这是因为,在任何时候,两个 GPU 中只有一个在工作,而另一个坐在那里什么也不做。在 layer2layer3 之中需要把中间输出从cuda:0拷贝到 cuda:1,这将进一步引起性能恶化。

让我们运行一个实验,以更从一个可以量化地角度来了解执行时间。在这个实验中,我们通过运行随机输入和标签来训练ModelParallelResNet50和现有 torchvision.models.resnet50()。训练后,模型不会产生任何有用的预测,但我们可以对执行时间有一个合理的了解。

import torchvision.models as models

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128


def train(model):
    model.train(True)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for _ in range(num_batches):
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # run forward pass
        optimizer.zero_grad()
        outputs = model(inputs.to(\'cuda:0\'))

        # run backward pass
        labels = labels.to(outputs.device)
        loss_fn(outputs, labels).backward()
        optimizer.step()

上述train(model)方法使用nn.MSELoss用作损失函数,使用optim.SGD作为优化器。它模仿 128 X 128图像的训练,这些图像被组织成 3 个批次,每批次包含 120 个图像。然后,我们使用timeit来运行 train(model) 10 次,并且用标准差来绘制执行时间。

import matplotlib.pyplot as plt
plt.switch_backend(\'Agg\')
import numpy as np
import timeit

num_repeat = 10

stmt = "train(model)"

setup = "model = ModelParallelResNet50()"
mp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)

setup = "import torchvision.models as models;" + \
        "model = models.resnet50(num_classes=num_classes).to(\'cuda:0\')"
rn_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)


def plot(means, stds, labels, fig_name):
    fig, ax = plt.subplots()
    ax.bar(np.arange(len(means)), means, yerr=stds,
           align=\'center\', alpha=0.5, ecolor=\'red\', capsize=10, width=0.6)
    ax.set_ylabel(\'ResNet50 Execution Time (Second)\')
    ax.set_xticks(np.arange(len(means)))
    ax.set_xticklabels(labels)
    ax.yaxis.grid(True)
    plt.tight_layout()
    plt.savefig(fig_name)
    plt.close(fig)


plot([mp_mean, rn_mean],
     [mp_std, rn_std],
     [\'Model Parallel\', \'Single GPU\'],
     \'mp_vs_rn.png\')

img

结果表明,模型并行需要的执行时间比但GPU实现需要的时间长 4.02/3.75-1=7%。所以我们可以得出结论,在 GPU 之间来回复制张量大约有 7% 的开销。

2.3 问题与方案

2.3.1 目前状况

我们总结一下目前状况:

  • 虽然有多块GPU,但是在整个执行过程中的每一个时刻,只有一个GPU在计算,其他GPU处于空闲状态。
  • 另外还有中间计算结果在GPU之间的拷贝工作,这也使得性能恶化。

因此我们需要针对这两个问题进行针对性处理:

  • 让所有 GPU 都动起来。
  • 减少拷贝传输时间。

2.3.2 解决方案

两个问题解决方案如下:

让所有 GPU 都动起来的一种选择是加入流水线机制:将每个批次进一步划分,组成一个分割(split )管道,这样当一个分割到达第二个子网络时,可以将接下来的分割送入第一个子网络。这样,两个连续的分割(split )就可以在两个 GPU 上同时运行。

为什么可以做到这一点?这是因为 CUDA 的异步并行执行逻辑。

  • CUDA 的一些操作是异步的,比如:核发射,设备间数据拷贝,主机和设备内拷贝小存储块等等。
  • 几乎所有具有计算能力1.1及更高计算能力的CUDA设备都支持并发复制和核执行,即数据拷贝和数值计算可以并行。
  • 一些计算能力2.x的设备可并发执行多个内核。
  • 在一些计算能力2.x的设备上,两个方向的拷贝可以并行(GPU到CPU,CPU到GPU)。

如何减少拷贝传输时间?这个可以使用一些硬件和软件的结合来增加带宽减少延迟,比如:

  • 硬件层面包括:单机内部的PCIe、NVlink、NVSwitch;多机之间的RDMA网络(IB或RoCE)。
  • 软件堆栈包括:GPUDirect的一系列技术:P2P(Peer-to-Peer),RDMA,Async,Storage等。

PyTorch使用了NCCL库(基于CUDA计算)。

2.4 通过流水线输入加速

在接下来的实验中,我们进一步将每个"120 个图像批次" 分成 "20 个图像分割(split)"。由于 PyTorch 异步启动 CUDA 操作,因此实现不需要产生多个线程来实现并发。

class PipelineParallelResNet50(ModelParallelResNet50):
    def __init__(self, split_size=20, *args, **kwargs):
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size

    def forward(self, x):
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to(\'cuda:1\')
        ret = []

        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to(\'cuda:1\')

        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)


setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)

plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     [\'Model Parallel\', \'Single GPU\', \'Pipelining Model Parallel\'],
     \'mp_vs_rn_vs_pp.png\')

请注意,设备到设备张量复制操作会在源设备和目标设备上的当前流上进行同步。如果创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量可能会导致未定义的行为。上述实现仅在源设备和目标设备上使用默认流,因此没有必要强制执行额外的同步操作。

img

实验结果表明,把流水线输入加入到 ResNet50 的模型并行之后,训练过程加快了大约3.75/2.51-1=49%。虽然它离理想的 100% 加速还很远。由于我们在流水线并行实现中引入了一个新参数split_sizes,因此尚不清楚此新参数如何影响整体训练时间。直观地说,使用小的split_size会导致许多微小的 CUDA 核启动,而使用大split_size结果会导致在第一次和最后一次拆分期间产生相对较长的空闲时间。两者都不是最优的。split_size这个特定实验可能有一个最佳配置。让我们尝试通过使用几个不同的split_size值运行实验来找到它。

means = []
stds = []
split_sizes = [1, 3, 5, 8, 10, 12, 20, 40, 60]

for split_size in split_sizes:
    setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
    pp_run_times = timeit.repeat(
        stmt, setup, number=1, repeat=num_repeat, globals=globals())
    means.append(np.mean(pp_run_times))
    stds.append(np.std(pp_run_times))

fig, ax = plt.subplots()
ax.plot(split_sizes, means)
ax.errorbar(split_sizes, means, yerr=stds, ecolor=\'red\', fmt=\'ro\')
ax.set_ylabel(\'ResNet50 Execution Time (Second)\')
ax.set_xlabel(\'Pipeline Split Size\')
ax.set_xticks(split_sizes)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig("split_size_tradeoff.png")
plt.close(fig)

img

结果表明,设置split_size为 12 实现了最快的训练速度,从而导致3.75/2.43-1=54%加速。我们仍有机会进一步加快训练进程。例如,目前所有cuda:0上的操作都放在其默认流上。这意味着下一个拆分的计算不能与上一个拆分的复制操作重叠。但是,由于 prev 和 next 拆分(split)是不同的张量,因此将一个张量的计算与另一个张量的拷贝重叠起来是没有问题的。这种实现需要在两个GPU上使用多个流,并且不同的子网结构需要不同的流管理策略。由于没有一个适用于所有模型并行用例的通用的多流解决方案,我们不会在本教程中讨论它。

这篇文章展示了几个性能测量。在您自己的机器上运行相同的代码时,您可能会看到不同的性能结果,因为结果取决于底层硬件和软件。要为您的环境获得最佳性能,正确的方法是首先生成结果曲线,并根据曲线来确定最佳分割大小,然后将该分割大小应用到管道输入之上。

0x03 分布式问题和方案

我们已经了解了单机之上的模型并行,接下来就要看模型跨越多个服务器的分布式模型并行训练。

3.1 思路

我们先设想一下如果自己实现分布式优化器,应该如何处理。

假如模型分为三个部分,有三个主机可以训练。

+----------------------------------------------------------------+
| Model                                                          |
|                                                                |
| +-----------------+  +------------------+  +-----------------+ |
| | Sub+model 1     |  | Sub+model 2      |  | Sub+model 3     | |
| |                 |  |                  |  |                 | |
| |                 |  |                  |  |                 | |
| +-----------------+  +------------------+  +-----------------+ |
|                                                                |
+----------------------------------------------------------------+

+-------------------+  +------------------+  +-----------------+
| Host 1            |  | Host 2           |  | Host 3          |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
+-------------------+  +------------------+  +-----------------+

我们会显式的把这三个部分分别部署到三个主机之上,在三个主机之上都有一套自己的训练代码,每个训练代码之中都有自己的本地优化器负责优化本地子模型的参数。

+---------------------+         +---------------------+         +---------------------+
| Host 1              |         | Host 2              |         | Host 3              |
|                     |         |                     |         |                     |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
| | Sub model 1     | |forward  | | Sub model 2     | |forward  | | Sub model 3     | |
| |                 | +-------> | |                 | +-------> | |                 | |
| |_parameters <--+ | |         | |_parameters <--+ | |         | |_parameters <--+ | |
| |               | | | <-------+ |               | | | <-------+ |               | | |
| |               | | | backward| |               | | | backward| |               | | |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
|                 |   |         |                 |   |         |                 |   |
|                 |   |         |                 |   |         |                 |   |
| ------------------+ |         | +-----------------+ |         | +-----------------+ |
| |Optimizer 1    | | |         | | Optimizer 2   | | |         | | Optimizer 3   | | |
| |               | | |         | |               | | |         | |               | | |
| |    step() +---+ | |         | |    step() +---+ | |         | |     step()+---+ | |
| |                 | |         | |                 | |         | |                 | |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
+---------------------+         +---------------------+         +---------------------+

但是这样有几个问题需要我们解决:

  • 如何划分模型到不同机器上?如何把代码分割到不同机器上?
  • 如何跨机器把前向传播,后向传播连接在一起?
  • 各个机器之间是同步运行还是异步运行?
  • 如果是同步,如何让整个系统用同一个步骤运行?
  • 如何把这些优化器结合在一起?还是优化器各做各的,彼此没有任何联系?
  • 如何尽力让用户少修改代码?
  • 如何能让开发者感觉就是开发本地版本代码?

经过思考就会发现,这里面错综复杂。如果我们自己基于 PyTorch 来实现,你会发现这可能最终结果是一个 PipeDream。于是我们看看 PyTorch 如何处理。

3.2 PyTorch 的思路

PyTorch 使用 RPC 来解决这些问题。

3.2.1 四大天王

前文我们提到了,PyTorch的分布式框架使用了四大天王:

  • **远程过程调用 (RPC) ** 使用给定的参数在指定的worker上运行函数并获取返回值或创建对返回值的引用。有三个主要的 API: rpc_sync()(同步)、 rpc_async()(异步)和 remote()(异步并返回对远程返回值的引用)。
    • 如果用户代码在没有返回值的情况下无法继续,请使用同步 API。
    • 否则,使用异步 API 获取 Future,并在调用者需要返回值时等待 Future。
    • remote() API适用如下情况:需要在远程创建某些内容但从不需要将其获取给调用者。
  • 远程引用 (RRef) 是指向本地或远程对象的分布式共享指针,就是本地或者跨机器的变量引用。
  • **Distributed Autograd **将所有参与前向传播 worker的本地 autograd 引擎缝合在一起,并在后向传播期间自动联系它们以计算梯度。在进行前向传递如果需要跨越多台机器时,这尤其有用,例如分布式模型并行训练、参数服务器训练等。 有了这个特性,用户代码不再需要担心如何跨 RPC 边界发送梯度和应该以什么顺序启动本地 autograd 引擎,如果前向传递中有嵌套和相互依赖的 RPC 调用,这可能会变得非常复杂。
  • 分布优化器的构造需要一个 Optimizer()(例如,SGD()Adagrad()等)和一个RRefs的参数列表。即,在每个不同的Ref所有者之上创建一个 Optimizer()实例,然后运行step()相应更新参数。当用户进行分布式前向和后向传播时,参数和梯度将分散在多个 worker 中,因此需要对每个相关 worker 进行优化。Distributed Optimizer 将所有这些本地优化器合而为一,并提供了简洁的构造函数和step()API。

3.2.2 逻辑关系

我们使用官方图示,可以看到 PyTorch 分布式包的内部架构和逻辑关系。分布式优化器基于另外三者之上。

我们会在后续结合代码进行讲解如何使用。

0x04 PyTorch 分布式优化器

首先说明一下,为了清晰的分析,我们后续忽略所有 script 相关部分。

4.1 示例

DistributedOptimizer 的使用方法如下:

  1. 获取要优化的远程参数列表 (RRef)。 这些也可以是包装在本地RRef中的本地参数。
  2. Optimizer 类作为本地优化器来运行所有的RRef owner。
  3. 分布式优化器在每个 worker 节点上创建其本地优化器的实例,并持有这些本地优化器的 RRef。
  4. 当调用 torch.distributed.optim.DistributedOptimizer.step() 时,分布式优化器使用 RPC 在适当的远程 worker 上远程执行所有本地优化器。torch.distributed.optim.DistributedOptimizer.step 必须获得一个分布式autograd context_id作为输入,本地优化器将把梯度保存在相关的context之中。
  5. 如果多个并发的分布式优化器同时更新工作器上的相同参数,则这些更新将通过锁序列化。

看起来有点抽象,我们需要一步一步分析。

4.2 简单的端到端示例

综上所述,以下是使用分布式 autograd 和分布式优化器的简单端到端示例。 如果将代码放入名为“ dist_autograd_simple.py”的文件中,则可以使用命令MASTER_ADDR="localhost" MASTER_PORT=29500 python dist_autograd_simple.py运行该代码:

import multiprocessing as mp
import torch
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer

def random_tensor():
    return torch.rand((3, 3), requires_grad=True)

def _run_process(rank, dst_rank, world_size):
    name = "worker{}".format(rank)
    dst_name = "worker{}".format(dst_rank)

    # Initialize RPC.
    rpc.init_rpc(
        name=name,
        rank=rank,
        world_size=world_size
    )

    # Use a distributed autograd context.
    with dist_autograd.context() as context_id: # 本地优化器将把梯度保存在相关的context之中
        # Forward pass (create references on remote nodes).
        rref1 = rpc.remote(dst_name, random_tensor) # 在远端创建一个 random_tensor
        rref2 = rpc.remote(dst_name, random_tensor) # 在远端创建一个 random_tensor
        loss = rref1.to_here() + rref2.to_here() # 获取要优化的远程参数列表 (`RRef`)

        # Backward pass (run distributed autograd).
        dist_autograd.backward([loss.sum()])

        # Build DistributedOptimizer.
        dist_optim = DistributedOptimizer( # 分布式优化器在每个 worker 节点上创建其本地Optimizer的实例,并将持有这些本地优化器的 RRef。
        optim.SGD,
        [rref1, rref2],
        lr=0.05,
        )

        # Run the distributed optimizer step.
        dist_optim.step()

def run_process(rank, dst_rank, world_size):
    _run_process(rank, dst_rank, world_size)
    rpc.shutdown()

processes = []

# Run world_size workers.
world_size = 2
for i in range(world_size):
    p = mp.Process(target=run_process, args=(i, (i + 1) % 2, world_size))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

4.3 定义

DistributedOptimizer 得到了分散在 workers 之上参数的远端引用,然后对于这些参数在本地运行优化器。

对于单个worker来说,如果它接受到来自相同或不同客户端的~torch.distributed.optim.DistributedOptimizer.step的并发调用,则这些调用将会在这个worker之上串行进行,因为每个worker的优化器一次只能处理一组梯度。

DistributedOptimizer 的定义其实看不到啥东西,这是因为 Python 的语言特性,我们没办法在统一地方看到类的成员变量,但是有一个 functional_optim_map 值得我们关注。 这里是把每个内置优化器又配置了一个对应的新优化器,比如 optim.Adagrad 对应的是 _FunctionalAdagrad,我们就选择一个新优化器看看。

class DistributedOptimizer:
    """
    DistributedOptimizer takes remote references to parameters scattered
    across workers and applies the given optimizer locally for each parameter.

    This class uses :meth:`~torch.distributed.autograd.get_gradients` in order
    to retrieve the gradients for specific parameters.

    Concurrent calls to
    :meth:`~torch.distributed.optim.DistributedOptimizer.step`,
    either from the same or different clients, will
    be serialized on each worker -- as each worker\'s optimizer can only work
    on one set of gradients at a time. However, there is no guarantee that
    the full forward-backward-optimizer sequence will execute for one client
    at a time. This means that the gradients being applied may not correspond
    to the latest forward pass executed on a given worker. Also, there is no
    guaranteed ordering across workers.

    `DistributedOptimizer` creates the local optimizer with TorchScript enabled
    by default, so that optimizer updates are not blocked by the Python Global
    Interpreter Lock (GIL) in the case of multithreaded training (e.g. Distributed
    Model Parallel). This feature is currently enabled for most optimizers. You
    can also follow `the recipe`__ in PyTorch tutorials to enable TorchScript support
    for your own custom optimizers.

    Args:
        optimizer_class (optim.Optimizer): the class of optimizer to
            instantiate on each worker.
        params_rref (list[RRef]): list of RRefs to local or remote parameters
            to optimize.
        args: arguments to pass to the optimizer constructor on each worker.
        kwargs: arguments to pass to the optimizer constructor on each worker.
        
    """
    
    # dict to map a user passed in optimizer_class to a functional
    # optimizer class if we have already defined inside the
    # distributed.optim package, this is so that we hide the
    # functional optimizer to user and still provide the same API.
    functional_optim_map = {
        optim.Adagrad: _FunctionalAdagrad,
        optim.Adam: _FunctionalAdam,
        optim.AdamW: _FunctionalAdamW,
        optim.SGD: _FunctionalSGD,
        optim.Adadelta: _FunctionalAdadelta,
        optim.RMSprop: _FunctionalRMSprop,
        optim.Rprop: _FunctionalRprop,
        optim.Adamax: _FunctionalAdamax,
    }        

4.3.1_FunctionalSGD

optim.SGD 对应的是 _FunctionalSGD。其代码位于 torch/distributed/optim/functional_sgd.py。具体是定义一个与TorchScript兼容的函数式SGD优化器,PyTorch 将以函数的方式使用这些优化器。在更新参数时,PyTorch 不使用 param.grad,而是显式地允许分布式优化器将梯度传递给 step 函数。注意:此优化器应该仅由分布式优化器内部使用,而不是向用户公开。

import torch.optim._functional as F

# Define a TorchScript compatible Functional SGD Optimizer
# where we use these optimizer in a functional way.
# Instead of using the `param.grad` when updating parameters,
# we explicitly allow the distributed optimizer pass gradients to
# the `step` function. In this way, we could separate the gradients
# and parameters and allow multithreaded trainer to update the
# parameters without data traces on accumulating to the same .grad.
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalSGD(object):
    def __init__(
        self,
        params: List[Tensor],
        lr: float = 1e-2,
        momentum: float = 0.0,
        dampening: float = 0.0,
        weight_decay: float = 0.0,
        nesterov: bool = False
    ):
        self.defaults = {
            "lr": lr,
            "momentum": momentum,
            "dampening": dampening,
            "weight_decay": weight_decay,
        }
        self.nesterov = nesterov
        self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})

        # NOTE: we only have one param_group and don\'t allow user to add additional
        # param group as it\'s not a common use case.
        self.param_group = {"params": params}

    def step(self, gradients: List[Optional[Tensor]]):
        params = self.param_group[\'params\']
        grads = []
        momentum_buffer_list: List[Optional[Tensor]] = []
        lr = self.defaults[\'lr\']
        weight_decay = self.defaults[\'weight_decay\']
        momentum = self.defaults[\'momentum\']
        dampening = self.defaults[\'dampening\']

        for param, gradient in zip(params, gradients):
            if gradient is not None:
                grads.append(gradient)

                if param not in self.state:
                    self.state[param] = {}

                state = self.state[param]
                if \'momentum_buffer\' not in state:
                    momentum_buffer_list.append(None)
                else:
                    momentum_buffer_list.append(state[\'momentum_buffer\'])

        with torch.no_grad():
            F.sgd(params,
                  grads,
                  momentum_buffer_list,
                  weight_decay=weight_decay,
                  momentum=momentum,
                  lr=lr,
                  dampening=dampening,
                  nesterov=self.nesterov)

        # update momentum_buffers in state
        for i, p in enumerate(params):
            state = self.state[p]
            momentum_buffer = momentum_buffer_list[i]
            if momentum_buffer is not None:
                state[\'momentum_buffer\'] = momentum_buffer

4.4 初始化

4.4.1 初始化

这部分代码主要对应了:分布式优化器在每个 worker 节点上创建其本地Optimizer的实例,并将持有这些本地优化器的 RRef。具体结合我们之前示例代码来看,params_rref 就是需要优化的参数列表,每个会对应一个优化器,就是 DistributedOptimizer 生成了所有节点上的优化器,以 rpc.RRef(_LocalOptimizer) 形式保存在 self.remote_optimizers 之中。

def __init__(self, optimizer_class, params_rref, *args, **kwargs):
    per_worker_params_rref = defaultdict(list)
    for param in params_rref: # 
        per_worker_params_rref[param.owner()].append(param) # [owner] = param

    # 拿到对应的本地优化器类    
    if optimizer_class in DistributedOptimizer.functional_optim_map and jit._state._enabled:
        optim_ctor = DistributedOptimizer.functional_optim_map.get(optimizer_class)
    else:
        optim_ctor = optimizer_class
    self.is_functional_optim = (optim_ctor != optimizer_class)

    if self.is_functional_optim:
        optimizer_new_func = _new_script_local_optimizer
    else:
        optimizer_new_func = _new_local_optimizer # 下面会介绍

    remote_optim_futs = []
    for worker, param_rrefs in per_worker_params_rref.items():
        remote_optim_rref_fut = rpc.rpc_async(
            worker, # 在 worker 之上生成其本地优化器
            optimizer_new_func, # rpc_async 会调用
            args=(optim_ctor, param_rrefs) + args,
            kwargs=kwargs,
        )
        remote_optim_futs.append(remote_optim_rref_fut)

    self.remote_optimizers = _wait_for_all(remote_optim_futs) # 本地保存的远端各个节点上优化器

4.4.2 生成优化器 _LocalOptimizer

_new_local_optimizer 是生成了_LocalOptimizer

def _new_local_optimizer(optim_cls, local_params_rref, *args, **kwargs):
    return rpc.RRef(
        _LocalOptimizer(optim_cls, local_params_rref, *args, **kwargs))

_LocalOptimizer 是本地优化器,其运行在远端worker节点之上,master 拥有这些优化器的代理。

class _LocalOptimizer(object):
    # Ideally we would only need to share a lock for instances of
    # _LocalOptimizer that deal with the same parameters. We are
    # making a simplifying assumption here that if there is more
    # than one instance of _LocalOptimizer per worker, they will
    # be optimizing the same parameters (e.g. each data parallel
    # trainer will create its own instance of _LocalOptimizer but
    # they will all optimize the same parameters on each worker)
    global_lock = Lock()

    def __init__(self, optim_cls, local_params_rref, *args, **kwargs):
        self._local_params = [rref.local_value() for rref in local_params_rref]
        self.optim = optim_cls( # 优化器还是普通的优化器,因为优化器代码还是之前的,只是优化的参数对象变成了异地节点参数
            self._local_params, # 用参数代理初始化
            *args,
            **kwargs)

    def step(self, autograd_ctx_id):
        # 获取到分布上下文里面计算好的梯度
        all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)

        with _LocalOptimizer.global_lock:
            for param, grad in all_local_grads.items():
                param.grad = grad
            self.optim.step() # 参数优化

4.4.3 等待完成

用 _wait_for_all 等待异步完成。

def _wait_for_all(rpc_futs):
    # TODO: improve error propagation
    exception = None
    results = []
    for fut in rpc_futs:
        try:
            results.append(fut.wait())
        except Exception as e:
            results.append(e)
            exception = e
    if exception is not None:
        raise exception
    return results

对应的逻辑如下:

  • ref1, ref2 是远端待优化的参数,都是 torch.rand((3, 3))。
  • optim_rref1,optim_rref2 分别是 Node 2,Node 3上本地优化器的 rref。
                                                      +----------------------------------+
+--------------------------------------------+        | Node 2                   worker 1|
| Node 1                              master |        |                                  |
|                                            |        |    +--------------------------+  |
|                                            |        |    | _LocalOptimizer          |  |
|  +---------------------------------+       |        |    |                          |  |
|  | DistributedOptimizer            |       |        |    |                          |  |
|  |                                 |       |        |    |   optim = _FunctionalSGD |  |
|  |                                 |       |        |    |                          |  |
|  |     remote_optimizers = [       |       |        |    |   _local_params = rref1  |  |
|  |                optim_rref1 +------------------------> |                     +    |  |
|  |                ,                |       |        |    |                     |    |  |
|  |                optim_rref2 +-------+    |        |    +--------------------------+  |
|  |                ]                |  |    |        |                          |       |
|  |                                 |  |    |        |                          v       |
|  |                                 |  |    |   +-------------->   torch.rand((3, 3))   |
|  |                                 |  |    |   |    |                                  |
|  +---------------------------------+  |    |   |    +----------------------------------+
|                                       |    |   |
|                                       |    |   |    +-----------------------------------+
|                                       |    |   |    | Node 3                   worker 2 |
|                                       |    |   |    |                                   |
|                                       |    |   |    |     +--------------------------+  |
|                                       |    |   |    |     | _LocalOptimizer          |  |
|                                       |    |   |    |     |                          |  |
|                                       +-----------------> |                          |  |
|                                            |   |    |     |   optim = _FunctionalSGD |  |
|                                            |   |    |     |                          |  |
|                             rref1 +------------+    |     |   _local_params = rref2  |  |
|                                            |        |     |                     +    |  |
|                                            |        |     |                     |    |  |
|                             rref2 +------------+    |     +--------------------------+  |
|                                            |   |    |                           |       |
|                                            |   |    |                           |       |
|                                            |   |    |                           v       |
|                                            |   +--------------->   torch.rand((3, 3))   |
|                                            |        |                                   |
+--------------------------------------------+        +-----------------------------------+

4.5 更新参数

DistributedOptimizer 在优化时候,会遍历保存的优化器,逐一调用 _local_optimizer_step。

为什么可以在Node 1 之上统一调用这些远端优化器?因为最后更新所有参数完毕之后,才能调用下一轮前向传播,所以可以统一调用然后等待都完成

def step(self, context_id):
    """
    Performs a single optimization step.

    This will call :meth:`torch.optim.Optimizer.step` on each worker
    containing parameters to be optimized, and will block until all workers
    return. The provided ``context_id`` will be used to retrieve the
    corresponding :class:`~torch.distributed.autograd.context` that
    contains the gradients that should be applied to the parameters.

    Args:
        context_id: the autograd context id for which we should run the
            optimizer step.
    """
    dist_autograd._is_valid_context(context_id)

    if self.is_functional_optim:
        optimizer_step_func = _script_local_optimizer_step
    else:
        optimizer_step_func = _local_optimizer_step # 

    rpc_futs = []
    for optimizer in self.remote_optimizers: # 遍历 _LocalOptimizer
        rpc_futs.append(rpc.rpc_async( # 异步异地调用
            optimizer.owner(),
            optimizer_step_func, # 逐一调用
            args=(optimizer, context_id),
        ))
    _wait_for_all(rpc_futs)

4.5.1 本地优化

_local_optimizer_step 就是得到 _LocalOptimizer,然后调用其 step。

def _local_optimizer_step(local_optim_rref, autograd_ctx_id):
    local_optim = local_optim_rref.local_value()
    local_optim.step(autograd_ctx_id)

_LocalOptimizer 的 step 首先获取分布式梯度,然后用这个梯度进行参数优化。

class _LocalOptimizer(object):

    def step(self, autograd_ctx_id):
        # 获取到分布上下文里面计算好的梯度
        all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)

        with _LocalOptimizer.global_lock:
            for param, grad in all_local_grads.items():
                param.grad = grad
            self.optim.step() # 参数优化

4.5.2 获取分布式梯度

get_gradients 的 Python 代码其实没有意义。

def get_gradients(context_id): # real signature unknown; restored from __doc__
    """
    get_gradients(context_id: int) -> Dict[Tensor, Tensor]
    
    Retrieves a map from Tensor to the appropriate gradient for that Tensor
    accumulated in the provided context corresponding to the given ``context_id``
    as part of the distributed autograd backward pass.
    
    Arguments:
        context_id(int): The autograd context id for which we should retrieve the
                         gradients.
    
    Returns:
        A map where the key is the Tensor and the value is the associated gradient
        for that Tensor.
    
    Example::
        >>> import torch.distributed.autograd as dist_autograd
        >>> with dist_autograd.context() as context_id:
        >>>     t1 = torch.rand((3, 3), requires_grad=True)
        >>>     t2 = torch.rand((3, 3), requires_grad=True)
        >>>     loss = t1 + t2
        >>>     dist_autograd.backward(context_id, [loss.sum()])
        >>>     grads = dist_autograd.get_gradients(context_id)
        >>>     print(grads[t1])
        >>>     print(grads[t2])
    """
    return {}

其对应 C++ 的位于 torch/csrc/jit/runtime/register_distributed_ops.cpp。是调用了上下文的函数。

// Implementations located in
// torch/csrc/jit/runtime/register_distributed_ops.cpp
TORCH_LIBRARY_IMPL(aten, CatchAll, m) {
  m.impl("get_gradients", [](int64_t context_id) {
    const auto& autogradContext =
        dist_autograd::DistAutogradContainer::getInstance().retrieveContext(
            context_id);
    return autogradContext->getGradients(); // 上下文
  });
}

C++世界的 getGradients 代码如下:

const c10::Dict<torch::Tensor, torch::Tensor> DistAutogradContext::
    getGradients() const {
  std::lock_guard<std::mutex> guard(lock_);
  // block current streams before accessing gradients to make sure that
  // gradient computations are finished before use.
  for (auto& entry : gradReadyEvents_) {
    auto& event = entry.second;
    event.block(impl_.getStream(event.device()));
  }
  return accumulatedGrads_; // 分布式梯度累积在这里
}

在 torch/csrc/distributed/autograd/context/context.h之中有:

// DistAutogradContext which stores information for a single distributed
// autograd pass on a worker.
class TORCH_API DistAutogradContext {
  // Gradients accumulated in this context so far. The key is the variable on
  // which the gradient needs to be accumulated and the value is the gradient
  // that needs to be accumulated on that variable..
  c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_;

所以我们逻辑拓展如下:

  1. DistributedOptimizer 调用 optim_rref1 和 optim_rref2 的 step 方法在远端 worker 之上进行运行,优化。
  2. Worker 1 和 worker 2 之上的 _LocalOptimizer 分别获得对本地 _local_params_ 进行优化。
  3. 优化结果在 _Node DistAutogradContext 之中的accumulatedGrads_累积。

这样,整个模型的各个子模型就在各个 Node 之上以统一的步骤进行训练/优化。

                                                   +--------------------------------------+
                                                   | Node 2                      worker 1 |
                                                   |                                      |
                                                   |    +--------------------------+      |
                                                   |    | DistAutogradContext      |      |
                                                   |    |                          |  3   |
                                                   |    |     accumulatedGrads_ <------+  |
+-----------------------------------------+        |    |                          |   |  |
| Node 1                           master |        |    +--------------------------+   |  |
|                                         |        |    +--------------------------+   |  |
| +-------------------------------+       |  +--------> | _LocalOptimizer          |   |  |
| | DistributedOptimizer          |       |  |     |    |                          |   |  |
| |                               |       |  |     |    |   optim = _FunctionalSGD |   |  |
| |                               |       |  |     |    |                          |   |  |
| |   remote_optimizers = [       |       |  |     |    |   _local_params = rref1  |   |  |
| |              optim_rref1 +---------------+     |    |                     +    |   |  |
| |              ,                |       |     +---------> step() +-------------------+  |
| |              optim_rref2 +-------+    |     |  |    |                     |    |      |
| |                               |  |    |     |  |    +--------------------------+      |
| |              ]           +----------------->+  |                        2 |           |
| |                          |    |  |    |        |                          v           |
| |                          |    |  |    |   +----------------> torch.rand((3, 3))       |
| |                        1 |    |  |    |   |    |                                      |
| |   step() {               |    |  |    |   |    +--------------------------------------+
| |                          |    |  |    |   |
| |     optim_rref1.step()+--+    |  |    |   |    +--------------------------------------+
| |                               |  |    |   |    | Node 3                      worker 2 |
| |     optim_rref2.step()+--+    |  |    |   |    |                                      |
| |                          |    |  |    |   |    |     +--------------------------+     |
| |   }                      |    |  |    |   |    |     | _LocalOptimizer          |     |
| |                          |    |  |    |   |    |     |                          |     |
| +-------------------------------+  +-----------------> |                          |     |
|                            |            |   |    |     |   optim = _FunctionalSGD |     |
|                            |            |   |    |     |                          |     |
|                          1 |            |   |    |     |   _local_params = rref2  |     |
|                            |            |   |    |     |                     +    |  3  |
|                            +-----------------------------> step() +------------------v  |
|                                         |   |    |     |                     |    |  |  |
|                         rref1 +-------------+    |     +--------------------------+  |  |
|                                         |        |                        2  |       |  |
|                                         |        |                           v       |  |
|                         rref2 +-------------------------------> torch.rand((3, 3))   |  |
|                                         |        |                                   |  |
+-----------------------------------------+        |     +--------------------------+  |  |
                                                   |     | DistAutogradContext      |  |  |
                                                   |     |                          |  |  |
                                                   |     |     accumulatedGrads_ <-----+  |
                                                   |     |                          |     |
                                                   |     +--------------------------+     |
                                                   +--------------------------------------+

0x05 PipeDream 优化器

最后,我们来看看 PipeDream,看看它是怎么实现分布式优化器的,我们探寻的思路是:

  • 因为PipeDream是在每个worker之上启动全部代码,所以每个本地优化器如何确定自己要优化的参数?
  • 优化时候如何更新参数?

5.1 如何确定优化参数

我们先提前说一下:

  • 每个node的module不同,所以每个优化器的待优化参数是本地module的参数。
  • 每个node优化自己负责的部分module。

我们需要从头梳理。

5.1.1 main 方法

来到 runtime/translation/main_with_runtime.py。这里首先构建一个 StageRuntime,然后用 StageRuntime 的参数来构建优化器。

def main():
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)
    
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)    

5.1.2 构建runtime

StageRuntime 的 initialize 函数会构建 module,这里通过本 node 的stage 来构建自己的 modules。

我们从前面文章中摘录。

stage_to_module_map 就是设置 stage 到 modules 的关系,目的是为了得到本stage所对应的modules。

本stage(数值为 3)对应的是 index 为 3,4 的两个 module,就是下面的 3 ,3.

module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]

具体代码是:

def initialize(self, model, inputs_module_destinations,
               configuration_maps, master_addr, rank,
               local_rank, num_ranks_in_server):
  
        if module_to_stage_map is None:
            self.modules_with_dependencies = ModulesWithDependencies(model)
        else:
            # 依据本stage来找到自己的modules。
            modules = stage_to_module_map[self.stage]
            self.modules_with_dependencies = ModulesWithDependencies(
                [model[module] for module in modules])
        
        # 确定哪些模型layers
        modules = self.modules_with_dependencies.modules()            

        # 拿到 master_parameters 和 model_parameters
        if self.fp16:
            self.master_parameters = []
            self.model_parameters = []
            for i in range(len(modules)):
                import apex.fp16_utils as fp16_utils
                module_parameters, module_master_parameters = \
                    fp16_utils.prep_param_lists(modules[i])
                self.master_parameters.extend(module_master_parameters)
                self.model_parameters.extend(module_parameters)
        else:
            self.master_parameters = list(self.parameters())
            self.model_parameters = None     
            
            

比如模型被分配到两个node之上,每个node两个layers,这里 Node 2有一个DDP数据并行。

每个 Node 的模型参数就是不同的,Node 1 的待优化参数是 Layer 1,Layer 2 的参数;Node 2 的待优化参数是 Layer 3,Layer 4 的参数。

                                              Node 2
                                              +----------------------------------------+
                                              | Stage 2                   StageRuntime |
                                              |                                        |
Node 1                                        |           CommunicationHandler         |
+---------------------------------------+     |                                        |
| Stage 1        StageRuntime           |     |      +----------------------------+    |
|                                       |     |      | +------------------------+ |    |
|                                       |     |      | |Rank 2                  | |    |
|         CommunicationHandler          |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|      +-----------------------+        |     |      | |  Layer 3 +---> Layer 4 | |    |
|      |Rank 1                 |        |     |      | |                        | |    |
|      |                       |        |     | DDP  | |                        | |    |
|      | Layer 1 +---> Layer 2 |        +----------->+ +------------------------+ |    |
|      |                       |        |     |      | +------------------------+ |    |
|      |                       |        |     |      | |Rank 3                  | |    |
|      +-----------------------+        |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   master_parameters = Parameters(     |     |      | |  Layer 3 +---> Layer 4 | |    |
|                   Layer 1, Layer 2)   |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   model_parameters                    |     |      | +------------------------+ |    |
|                                       |     |      +----------------------------+    |
+---------------------------------------+     |                                        |
                                              |                                        |
                                              |  master_parameters = Parameters(       |
                                              |                      Layer 3, Layer 4) |
                                              |                                        |
                                              |                                        |
                                              |  model_parameters                      |
                                              |                                        |
                                              +----------------------------------------+

5.1.3 SGDWithWeightStashing

然后用 runtime 的 master_parameters 和 model_parameters 构建本地优化器 SGDWithWeightStashing。

OptimizerWithWeightStashing 是 SGDWithWeightStashing 的基类。

class SGDWithWeightStashing(OptimizerWithWeightStashing): # 基类
    """
    SGD optimizer with weight stashing.
    """
    def __init__(self, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, lr=required, momentum=0,
                 dampening=0, weight_decay=0, nesterov=False, verbose_freq=0,
                 macrobatch=False):
        super(SGDWithWeightStashing, self).__init__(
            optim_name=\'SGD\',
            modules=modules, master_parameters=master_parameters,
            model_parameters=model_parameters, loss_scale=loss_scale,
            num_versions=num_versions, lr=lr, momentum=momentum,
            dampening=dampening, weight_decay=weight_decay,
            nesterov=nesterov, verbose_freq=verbose_freq,
            macrobatch=macrobatch,
        )

基类 OptimizerWithWeightStashing 会生成一个原生优化器,赋值在 base_optimizer。

class OptimizerWithWeightStashing(torch.optim.Optimizer):
    """Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer.

    Arguments:
        - optim_name: the name of optimizer, required to create the corresponding
                      base_optimizer (torch.optim.{optim_name}).
        - optimizer_args: the keyword arguments passed to base_optimizer.
    """

    def __init__(self, optim_name, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, verbose_freq=0, macrobatch=False,
                 **optimizer_args):
        self.modules = modules
        self.master_parameters = master_parameters
        self.model_parameters = model_parameters  # model_parameters is None if not fp16.
        self.loss_scale = loss_scale

        # Only need at most 2 versions if using macrobatching.
        if macrobatch:
            num_versions = min(2, num_versions)
        self.num_versions = num_versions
        
        # 生成一个原生优化器
        self.base_optimizer = getattr(torch.optim, optim_name)(
            master_parameters, **optimizer_args)
        self.latest_version = Version()
        self.current_version = Version()
        self.initialize_queue()
        self.verbose_freq = verbose_freq
        self.batch_counter = 0

        # If macrobatching, push and pop versions at the right rate.
        if macrobatch:
            self.update_interval = self.num_versions
        else:
            self.update_interval = 1

逻辑拓展如下,每个优化器使用自己 Node 的参数进行优化。

                                              +----------------------------------------+
                                              | Stage 2                   StageRuntime |
                                              |                                        |
                                              |           CommunicationHandler         |
+---------------------------------------+     |                                        |
| Stage 1        StageRuntime           |     |      +----------------------------+    |
|                                       |     |      | +------------------------+ |    |
|                                       |     |      | |Rank 2                  | |    |
|         CommunicationHandler          |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|      +-----------------------+        |     |      | |  Layer 3 +---> Layer 4 | |    |
|      |Rank 1                 |        |     |      | |                        | |    |
|      |                       |        |     | DDP  | |                        | |    |
|      | Layer 1 +---> Layer 2 |        +----------->+ +------------------------+ |    |
|      |                       |        |     |      | +------------------------+ |    |
|      |                       |        |     |      | |Rank 3                  | |    |
|      +-----------------------+        |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   master_parameters = Parameters(     |     |      | |  Layer 3 +---> Layer 4 | |    |
|                   Layer 1, Layer 2)   |     |      | |                        | |    |
|                             +         |     |      | |                        | |    |
|   model_parameters          |         |     |      | +------------------------+ |    |
|                             |         |     |      +----------------------------+    |
|  +---------------------------------+  |     |                                        |
|  |SGDWithWeightStashing     |      |  |     |                                        |
|  |                          |      |  |     |  master_parameters = Parameters(       |
|  |   base_optimizer = SGB(  v      |  |     |                      Layer 3, Layer 4) |
|  |              master_parameters) |  |     |                               +        |
|  |                                 |  |     |  model_parameters             |        |
|  +---------------------------------+  |     |                               |        |
|                                       |     |  +----------------------------------+  |
+---------------------------------------+     |  |SGDWithWeightStashing       |     |  |
                                              |  |                            |     |  |
                                              |  |      base_optimizer = SGB( v     |  |
                                              |  |               master_parameters) |  |
                                              |  +----------------------------------+  |
                                              |                                        |
                                              +----------------------------------------+

5.2 优化

5.2.2 整体优化

整体是异步运行,也就是异步优化。

def train(train_loader, r, optimizer, epoch):

  	# 省略其他
    
    # start num_warmup_minibatches forward passes
    for i in range(num_warmup_minibatches):
        r.run_forward()

    for i in range(n - num_warmup_minibatches):
        # perform forward pass
        r.run_forward()

        # perform backward pass
        if args.fp16:
            r.zero_grad()
        else:
            optimizer.zero_grad()
        optimizer.load_old_params()

        r.run_backward()
        optimizer.load_new_params()
        optimizer.step()

    # finish remaining backward passes
    for i in range(num_warmup_minibatches):
        optimizer.zero_grad()
        optimizer.load_old_params()
        r.run_backward()
        optimizer.load_new_params()
        optimizer.step()

    # wait for all helper threads to complete
    r.wait()

5.2.2 优化器优化

优化直接使用 SGDWithWeightStashing 的 step 方法。其最后也是 class OptimizerWithWeightStashing(torch.optim.Optimizer) 的 step 方法。

def step(self, closure=None):
    """Performs a single optimization step.

    Arguments:
        closure (callable, optional): A closure that reevaluates the model
                                      and returns the loss.
    """
    # Update the gradient every `update_interval` steps.
    if self.batch_counter % self.update_interval != self.update_interval - 1:
        self.batch_counter += 1
        return None

    if self.model_parameters is not None:
        import apex.fp16_utils as fp16_utils
        fp16_utils.model_grads_to_master_grads(self.model_parameters,
                                               self.master_parameters)
        if self.loss_scale != 1.0:
            # 处理梯度
            for parameter in self.master_parameters:
                parameter.grad.data = parameter.grad.data / self.loss_scale

    for p in self.param_groups[0][\'params\']:
        if p.grad is not None: # 继续处理累积的梯度
            p.grad.div_(self.update_interval)

    loss = self.base_optimizer.step() # 进行优化
    if self.model_parameters is not None:
        import apex.fp16_utils as fp16_utils
        fp16_utils.master_params_to_model_params(self.model_parameters,
                                                 self.master_parameters)
    self.latest_version = self.latest_version.incr()
    if self.num_versions > 1:
        self.buffered_state_dicts = self.queue[0][0]
        self.queue.append(self.get_params(clone=False))

    self.batch_counter += 1
    return loss

具体如下:

                                               Node 2
                                               +-----------------------------------------+
                                               | Stage 2                    StageRuntime |
                                               |                                         |
Node 1                                         |           CommunicationHandler          |
+-----------------------------------------+    |                                         |
| Stage 1                    StageRuntime |    |      +----------------------------+     |
|                                         |    |      | +------------------------+ |     |
|                                         |    |      | |Rank 2                  | |     |
|          CommunicationHandler           |    |      | |                        | |     |
|                                         |    |      | |                        | |     |
|       +-----------------------+         |    |      | |  Layer 3 +---> Layer 4 | |     |
|       |Rank 1                 |         |    |      | |                        | |     |
|       |                       |         |    | DDP  | |                        | |     |
|       | Layer 1 +---> Layer 2 |         +---------->+ +------------------------+ |     |
|       |                       |         |    |      | +------------------------+ |     |
|       |                       |         |    |      | |Rank 3                  | |     |
|       +-----------------------+         |    |      | |                        | |     |
|                                         |    |      | |                        | |     |
|  master_parameters = Parameters(        |    |      | |  Layer 3 +---> Layer 4 | |     |
|                  Layer 1, Layer 2)      |    |      | |                        | |     |
|                                +        |    |      | |                        | |     |
|  model_parameters              |        |    |      | +------------------------+ |     |
|                                |        |    |      +----------------------------+     |
|  step()                        |        |    |                                         |
|   +                            |        |    |                                         |
|   |                            |        |    |  master_parameters = Parameters(        |
|   |  +-------------------------------+  |    |                      Layer 3, Layer 4)  |
|   |  |SGDWithWeightStashing    |     |  |    |                                   +     |
|   |  |                         |     |  |    |  model_parameters                 |     |
|   |  |   base_optimizer = SGB( v     |  |    |                                   |     |
|   |  |            master_parameters) |  |    |  step()                           |     |
|   |  |                               |  |    |   +                               |     |
|   +----> base_optimizer.step()       |  |    |   |                               |     |
|      |                               |  |    |   |  +-------------------------------+  |
|      +-------------------------------+  |    |   |  |SGDWithWeightStashing       |  |  |
|                                         |    |   |  |                            |  |  |
+-----------------------------------------+    |   |  |      base_optimizer = SGB( v  |  |
                                               |   |  |            master_parameters) |  |
                                               |   |  |                               |  |
                                               |   +------>  base_optimizer.step()    |  |
                                               |      |                               |  |
                                               |      +-------------------------------+  |
                                               |                                         |
                                               +-----------------------------------------+

至此,分布式优化器系列完成,在后续分析ZeRO时候,我们还会介绍 PyTorch ZeroRedundancyOptimizer,估计要等待几周之后了。我们从下一篇开始,介绍 PyTorch 分布式 的几个官方文档应用例子,以此来把 PyTorch 分布式整个逻辑串联起来看看在实际之中应该如何应用,敬请期待。

0xFF 参考

torch.optim.optimizer源码阅读和灵活使用

pytorch源码阅读(二)optimizer原理

pytorch 优化器(optim)不同参数组,不同学习率设置的操作

Pytorch——momentum动量

各种优化方法总结比较(sgd/momentum/Nesterov/adagrad/adadelta)

【优化器】优化器算法及PyTorch实现(一):永不磨灭的SGD

以optim.SGD为例介绍pytorch优化器

Pytorch学习笔记08----优化器算法Optimizer详解(SGD、Adam)

pytorch中使用torch.optim优化神经网络以及优化器的选择 - pytorch中文网

pytorch优化器详解:SGD

聊聊GPU通信那些事

https://developer.nvidia.com/gpudirect

https://www.nvidia.cn/data-center/magnum-io/

https://www.nvidia.cn/data-center/nvlink/

分类:

技术点:

相关文章: