【发布时间】:2021-09-08 06:12:18
【问题描述】:
注意:
经过实验,我注意到这个问题只在我在 GPU 上训练时出现。我创建了一个 github 问题 (#50454)。在这一点上,我不确定到底发生了什么。
我正在研究梯度累积的实现。但是,这些方法似乎都不起作用。下面我将描述两种理论上可行但似乎与 Tensorflow 冲突的方法。
想法
我想修补任意的Optimizer-instance,方法是用我自己的累积梯度的实现替换它的apply_gradients() 函数。
# Build model first
model.build()
# Patch the optimizer
optimizer = get_patched_optimizer(optimizer, n, model.trainable_variables)
# Compile the model with the patched optimizer
model.compile(optimizer=optimizer)
在哪里
def get_patched_optimizer(optimizer, n, trainable_variables):
"""Patch optimizer for gradient accumulation.
:param optimizer:
The optimizer to patch.
:param n:
The number of accumulation steps before applying gradients.
:param trainable_variables:
Trainable parameters of the model
:return:
A patched patched optimizer for gradient accumulation.
"""
accumulator = _GradientAccumulationPatch(
n=n,
orig_apply_gradients=optimizer.apply_gradients,
trainable_variables=trainable_variables
)
# Replace the original function
optimizer.apply_gradients = accumulator.apply_gradients
return optimizer
快乐(但不工作)的道路
最简单的方法是累积梯度并有条件地应用梯度,例如每当current_step % n == 0。
但是,这里的问题是,与他们在Gradient Accumulation with Custom model.fit in TF.Keras? 中的使用方式相比,在这种情况下我似乎无法使用tf.cond()。
使用tf.cond() 会产生以下RuntimeError
RuntimeError:
merge_call在定义新图形或 tf.function 时调用。如果传递给strategy.run()的函数fn包含嵌套的@tf.function,并且嵌套的@tf.function包含同步点,例如聚合梯度(例如,optimizer.apply_gradients),或者函数@ 987654340@ 使用控制流语句,其中包含主体中的同步点。尚不支持此类行为。相反,请避免嵌套tf.functions 或可能跨越同步边界的控制流语句,例如,将传递给strategy.run的fn或整个strategy.run包装在tf.function内或移动控制流来自fn
这里是_GradientAccumulationPatch使用tf.cond()的实现:
class _GradientAccumulationPatch:
def __init__(
self,
n: int,
orig_apply_gradients,
trainable_variables
):
self.n = tf.constant(n, dtype=tf.int64)
policy = tf.keras.mixed_precision.global_policy()
self.variable_dtype = policy.variable_dtype
self.accu_gradients = [
tf.Variable(
tf.zeros(g.shape, dtype=g.dtype),
) for g in trainable_variables
]
self._current_step = tf.Variable(0, dtype=tf.int64)
self._orig_apply_gradients = orig_apply_gradients
def apply_gradients(self, grads_and_vars, *args, **kwargs):
trainable_variables = [var for (_, var) in grads_and_vars]
gradients = [grad for (grad, _) in grads_and_vars]
# Always accumulate gradients
for i, grad in enumerate(gradients):
self.accu_gradients[i].assign_add(grad)
tf.cond(
self._can_apply_on_next_step(),
true_fn=lambda: self.apply_accu_gradients(trainable_variables, args, kwargs),
false_fn=lambda: None
)
def apply_accu_gradients(self, trainable_variables, *args, **kwargs):
# Call the original apply_gradients() function
self._orig_apply_gradients(zip(self.accu_gradients, trainable_variables), *args, **kwargs)
# Reset all accumulated gradients to zero
for i in range(len(self.accu_gradients)):
self.accu_gradients[i].assign(tf.zeros_like(trainable_variables[i]))
def _can_apply_on_next_step(self):
"""
:return: True if gradients should be applied; False otherwise.
"""
# Increment (always do this first)
self._current_step.assign_add(1)
count_mod_steps = tf.math.mod(self._current_step, self.n)
return tf.equal(count_mod_steps, 0)
更复杂的路径(也行不通)
可以通过简单地使用由_can_apply_on_next_step() 给出的信号apply 作为乘法因子来移除tf.cond(),并在我们处于累积阶段时应用零梯度。
我们的想法是始终累积梯度并始终通过一个特定的更改应用它们:
final_gradients = [grad * apply for grad in gradients]
self._orig_apply_gradients(zip(final_gradients, trainable_variables))
这就是我们更改 apply_gradients() 方法的方式:
def apply_gradients(self, grads_and_vars, *args, **kwargs):
can_apply = self._can_apply_on_next_step()
# 1.0 whenever we want to apply gradients; 0.0 otherwise
apply = tf.cast(can_apply, dtype=self.variable_dtype)
# Will be 0.0 if apply is 1.0 and vice versa
keep = tf.cast(tf.logical_not(can_apply), dtype=self.variable_dtype)
grads_and_vars = list(grads_and_vars)
gradients = [grad for (grad, _) in grads_and_vars]
trainable_variables = [var for (_, var) in grads_and_vars]
# Accumulate gradients
for i, grad in enumerate(gradients):
self.accu_gradients[i].assign_add(grad)
# Multiply each gradient with our apply-signal
final_gradients = [grad * apply for grad in self.accu_gradients]
self._orig_apply_gradients(zip(final_gradients, trainable_variables), *args, **kwargs)
# This will reset our buffer whenever "keep" is 0.0
for g in self.accu_gradients:
g.assign(g * keep)
但问题是self.accu_gradients[i].assign_add(grad)似乎没有任何作用。是的,我也试过了
self.accu_gradients[i].assign(grad + self.accu_gradients[i])
有趣的是,如果我使用 assign(grad) 代替 self.accu_gradients[i].assign_add(grad),模型开始收敛:
blue: just using assign() # <- no accumulation happening
red: using assign_add()
train_step()
这个补丁应该独立工作模型。我的模型确实有一个自定义的train_step(),但实现非常简单。
这里我只是计算gradients,然后是优化器的所有apply_gradients()方法:
def train_step(self, data):
(inputs, (input_lengths, label_lengths), mask), y_true = data
loss, gradients = self.rnnt_gradient(
inputs=inputs,
y_true=y_true,
input_lengths=input_lengths,
label_lengths=label_lengths,
mask=mask
)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return {'loss': loss}
def test_step(self, data):
(inputs, (input_lengths, label_lengths), mask), y_true = data
val_loss = self.rnnt_loss_wrapper(
inputs=inputs,
y_true=y_true,
input_lengths=input_lengths,
label_lengths=label_lengths,
mask=mask
)
return dict(loss=val_loss)
def rnnt_gradient(
self,
inputs: tuple,
y_true: tf.Tensor,
input_lengths: tf.Tensor,
label_lengths: tf.Tensor,
mask=None
):
with tf.GradientTape() as tape:
model_loss = self.rnnt_loss_wrapper(
inputs,
y_true=y_true,
input_lengths=input_lengths,
label_lengths=label_lengths,
mask=mask
)
is_mixed_precision = isinstance(self.optimizer, mixed_precision.LossScaleOptimizer)
# We always want to return the unmodified model_loss for Tensorboard
if is_mixed_precision:
loss = self.optimizer.get_scaled_loss(model_loss)
else:
loss = model_loss
gradients = tape.gradient(loss, self.trainable_variables)
if is_mixed_precision:
gradients = self.optimizer.get_unscaled_gradients(gradients)
return model_loss, gradients
【问题讨论】:
-
@M.Innat 本质上,他们所做的完全一样,但我想在优化器补丁中而不是在模型端实现 GA。我在使用他们的方法时面临的问题是
tf.cond在这种情况下不起作用。不知道为什么这对他们有用。 -
你检查过this吗?
-
@M.Innat 是的,我也看到了。我还更新了我的问题,以澄清我已经尝试过什么以及在我的情况下到底什么不起作用。
标签: tensorflow