【问题标题】:Why are these Gradient Accumulation implementations not working?为什么这些梯度累积实现不起作用?
【发布时间】:2021-09-08 06:12:18
【问题描述】:

注意:

经过实验,我注意到这个问题只在我在 GPU 上训练时出现。我创建了一个 github 问题 (#50454)。在这一点上,我不确定到底发生了什么。

我正在研究梯度累积的实现。但是,这些方法似乎都不起作用。下面我将描述两种理论上可行但似乎与 Tensorflow 冲突的方法。

想法

我想修补任意的Optimizer-instance,方法是用我自己的累积梯度的实现替换它的apply_gradients() 函数。

# Build model first
model.build()

# Patch the optimizer
optimizer = get_patched_optimizer(optimizer, n, model.trainable_variables)

# Compile the model with the patched optimizer
model.compile(optimizer=optimizer)

在哪里

def get_patched_optimizer(optimizer, n, trainable_variables):
    """Patch optimizer for gradient accumulation.

    :param optimizer:
        The optimizer to patch.
    :param n:
        The number of accumulation steps before applying gradients.
    :param trainable_variables:
        Trainable parameters of the model
    :return:
        A patched patched optimizer for gradient accumulation.
    """
    accumulator = _GradientAccumulationPatch(
        n=n,
        orig_apply_gradients=optimizer.apply_gradients,
        trainable_variables=trainable_variables
    )

    # Replace the original function
    optimizer.apply_gradients = accumulator.apply_gradients

    return optimizer

快乐(但不工作)的道路

最简单的方法是累积梯度并有条件地应用梯度,例如每当current_step % n == 0

但是,这里的问题是,与他们在Gradient Accumulation with Custom model.fit in TF.Keras? 中的使用方式相比,在这种情况下我似乎无法使用tf.cond()

使用tf.cond() 会产生以下RuntimeError

RuntimeError: merge_call 在定义新图形或 tf.function 时调用。如果传递给strategy.run() 的函数fn 包含嵌套的@tf.function,并且嵌套的@tf.function 包含同步点,例如聚合梯度(例如,optimizer.apply_gradients),或者函数@ 987654340@ 使用控制流语句,其中包含主体中的同步点。尚不支持此类行为。相反,请避免嵌套tf.functions 或可能跨越同步边界的控制流语句,例如,将传递给strategy.runfn 或整个strategy.run 包装在tf.function 内或移动控制流来自fn

这里是_GradientAccumulationPatch使用tf.cond()的实现:

class _GradientAccumulationPatch:

    def __init__(
        self,
        n: int,
        orig_apply_gradients,
        trainable_variables
    ):
        self.n = tf.constant(n, dtype=tf.int64)
        policy = tf.keras.mixed_precision.global_policy()
        self.variable_dtype = policy.variable_dtype
        self.accu_gradients = [
            tf.Variable(
                tf.zeros(g.shape, dtype=g.dtype),
            ) for g in trainable_variables
        ]

        self._current_step = tf.Variable(0, dtype=tf.int64)
        self._orig_apply_gradients = orig_apply_gradients

    def apply_gradients(self, grads_and_vars, *args, **kwargs):

        trainable_variables = [var for (_, var) in grads_and_vars]
        gradients = [grad for (grad, _) in grads_and_vars]

        # Always accumulate gradients
        for i, grad in enumerate(gradients):
            self.accu_gradients[i].assign_add(grad)

        tf.cond(
            self._can_apply_on_next_step(),
            true_fn=lambda: self.apply_accu_gradients(trainable_variables, args, kwargs),
            false_fn=lambda: None
        )

    def apply_accu_gradients(self, trainable_variables, *args, **kwargs):

        # Call the original apply_gradients() function
        self._orig_apply_gradients(zip(self.accu_gradients, trainable_variables), *args, **kwargs)

        # Reset all accumulated gradients to zero
        for i in range(len(self.accu_gradients)):
            self.accu_gradients[i].assign(tf.zeros_like(trainable_variables[i]))

    def _can_apply_on_next_step(self):
        """
        :return: True if gradients should be applied; False otherwise.
        """
        # Increment (always do this first)
        self._current_step.assign_add(1)
        count_mod_steps = tf.math.mod(self._current_step, self.n)
        return tf.equal(count_mod_steps, 0)

更复杂的路径(也行不通)

可以通过简单地使用由_can_apply_on_next_step() 给出的信号apply 作为乘法因子来移除tf.cond(),并在我们处于累积阶段时应用零梯度。

我们的想法是始终累积梯度并始终通过一个特定的更改应用它们:

final_gradients = [grad * apply for grad in gradients]
self._orig_apply_gradients(zip(final_gradients, trainable_variables))

这就是我们更改 apply_gradients() 方法的方式:

def apply_gradients(self, grads_and_vars, *args, **kwargs):

    can_apply = self._can_apply_on_next_step()
    # 1.0 whenever we want to apply gradients; 0.0 otherwise
    apply = tf.cast(can_apply, dtype=self.variable_dtype)
    # Will be 0.0 if apply is 1.0 and vice versa
    keep = tf.cast(tf.logical_not(can_apply), dtype=self.variable_dtype)

    grads_and_vars = list(grads_and_vars)
    gradients = [grad for (grad, _) in grads_and_vars]
    trainable_variables = [var for (_, var) in grads_and_vars]

    # Accumulate gradients
    for i, grad in enumerate(gradients):
        self.accu_gradients[i].assign_add(grad)

    # Multiply each gradient with our apply-signal
    final_gradients = [grad * apply for grad in self.accu_gradients]

    self._orig_apply_gradients(zip(final_gradients, trainable_variables), *args, **kwargs)

    # This will reset our buffer whenever "keep" is 0.0
    for g in self.accu_gradients:
        g.assign(g * keep)

但问题是self.accu_gradients[i].assign_add(grad)似乎没有任何作用。是的,我也试过了

self.accu_gradients[i].assign(grad + self.accu_gradients[i])

有趣的是,如果我使用 assign(grad) 代替 self.accu_gradients[i].assign_add(grad),模型开始收敛:

blue: just using assign()   # <- no accumulation happening
red:  using assign_add()

train_step()

这个补丁应该独立工作模型。我的模型确实有一个自定义的train_step(),但实现非常简单。

这里我只是计算gradients,然后是优化器的所有apply_gradients()方法:

def train_step(self, data):

    (inputs, (input_lengths, label_lengths), mask), y_true = data

    loss, gradients = self.rnnt_gradient(
        inputs=inputs,
        y_true=y_true,
        input_lengths=input_lengths,
        label_lengths=label_lengths,
        mask=mask
    )

    self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

    return {'loss': loss}

def test_step(self, data):

    (inputs, (input_lengths, label_lengths), mask), y_true = data

    val_loss = self.rnnt_loss_wrapper(
        inputs=inputs,
        y_true=y_true,
        input_lengths=input_lengths,
        label_lengths=label_lengths,
        mask=mask
    )

    return dict(loss=val_loss)


def rnnt_gradient(
    self,
    inputs: tuple,
    y_true: tf.Tensor,
    input_lengths: tf.Tensor,
    label_lengths: tf.Tensor,
    mask=None
):
    with tf.GradientTape() as tape:
        model_loss = self.rnnt_loss_wrapper(
            inputs,
            y_true=y_true,
            input_lengths=input_lengths,
            label_lengths=label_lengths,
            mask=mask
        )

        is_mixed_precision = isinstance(self.optimizer, mixed_precision.LossScaleOptimizer)

        # We always want to return the unmodified model_loss for Tensorboard
        if is_mixed_precision:
            loss = self.optimizer.get_scaled_loss(model_loss)
        else:
            loss = model_loss

        gradients = tape.gradient(loss, self.trainable_variables)

        if is_mixed_precision:
            gradients = self.optimizer.get_unscaled_gradients(gradients)

        return model_loss, gradients

【问题讨论】:

  • @M.Innat 本质上,他们所做的完全一样,但我想在优化器补丁中而不是在模型端实现 GA。我在使用他们的方法时面临的问题是tf.cond 在这种情况下不起作用。不知道为什么这对他们有用。
  • 你检查过this吗?
  • @M.Innat 是的,我也看到了。我还更新了我的问题,以澄清我已经尝试过什么以及在我的情况下到底什么不起作用。

标签: tensorflow


【解决方案1】:

事实证明,这完全是我的错,因为每当我使用 mixed_float16 策略进行训练时,我都会修补错误实例。

我所拥有的是这样的:

if precision_policy.name.startswith('mixed'):
    logger.info(f'Using LossScaleOptimizer (policy: "{precision_policy.name})"')
    optimizer = keras.mixed_precision.LossScaleOptimizer(optimizer)

if grad_acc_n > 1:
    # --> This patched the LossScaleOptimizer which caused the problem:
    optimizer = grad_acc.get_patched_optimizer(optimizer=optimizer, n=grad_acc_n)

所以我需要这样的检查:

if isinstance(optimizer, keras.mixed_precision.LossScaleOptimizer):
    # Warning: This does NOT work either (just an example)!
    optimizer.inner_optimizer.apply_gradients = accumulator.apply_gradients
    raise Exception('Don\'t do this!')
else:
    optimizer.apply_gradients = accumulator.apply_gradients

但是,正如评论中所述,修补 inner_optimizer 也不起作用。我还没弄清楚为什么,但至少我现在可以使用我的_GradientAccumulationPatch-implementation 运行“正常”float32-policy 培训。

【讨论】:

    猜你喜欢
    • 2019-04-19
    • 2022-12-12
    • 2020-12-28
    • 1970-01-01
    • 2021-01-04
    • 2021-09-02
    • 1970-01-01
    • 2011-04-14
    • 1970-01-01
    相关资源
    最近更新 更多