似乎没有 keras 构建方法可以做到这一点,但如果我错了,请纠正我。
我的方法
Dataset.shuffle 在内部使用初始种子值生成种子,用于在迭代期间重新洗牌reshuffle_each_iteration=True。因此,为特定时期重新创建相同的顺序并在该特定批次继续训练时期,我们必须重新创建具有相同种子的数据集并将数据集迭代器移动到相同时期和相同批次。
调试
为了调试并确保以相同的顺序生成 epoch 和 batch,我们需要一种方法来打印每个 epoch-batch 中数据点的拾取方式。这很棘手,因此出于调试目的,我将使用回归问题并将基本事实作为序列号。然后我可以有一个自定义损失,我可以在其中打印基本事实并使用户的顺序正确。
模型和数据
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import keras.backend as K
# Data
x_train = np.random.randn(15, 10).astype("float32")
y_train = np.arange(15).astype("float32")
# Custom MSE looss just to track the order in which data is picked up
def my_mse(y_true, y_pred):
tf.print(tf.keras.backend.flatten(y_true))
loss = K.square(y_pred - y_true)
loss = K.sum(loss, axis=1)
return loss
# Model
def get_model():
inputs = keras.Input(shape=(10))
outputs = layers.Dense(1, activation="linear")(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer="rmsprop",
loss=my_mse,
)
return model
数据集
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)
epochs = 2
print ("Runs 1")
for e in range(epochs):
for i, (x, y) in enumerate(train_dataset):
print (e, i, y)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)
print ("Runs 2")
for e in range(epochs):
for i, (x, y) in enumerate(train_dataset):
print (e, i, y)
输出:
Runs 1
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14. 9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10. 0. 14. 6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0. 1. 5. 6. 9. 3. 7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13. 8. 4. 10. 2. 12. 11.], shape=(7,), dtype=float32)
Runs 2
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14. 9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10. 0. 14. 6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0. 1. 5. 6. 9. 3. 7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13. 8. 4. 10. 2. 12. 11.], shape=(7,), dtype=float32)
是的,使用种子复制订单。
现在让我们编写一个方法将数据集转发到某个时期和批次组合
def forward(dataset, n=None):
if not n:
return dataset
i = 0
while True:
for _ in dataset:
i += 1
if i == n:
return dataset
测试用例:
让我们正常运行并观察顺序
从头开始的数据
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)
model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)
输出:
[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
数据集第 n 个状态的数据
让我们的数据集进行第 4 次迭代并运行训练
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)
model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)
输出:
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
很好,现在我们知道如何正确转发数据集了。现在让我们编写回调来跟踪当前的迭代次数:
跟踪迭代的自定义回调(epoch-batch 组合)
现在我们需要确定模型被检查指向的时期和批次组合。如果我们有这些信息,我们可以加载最后一个检查点模型并将我们的数据集转发到它的批次和时期组合并继续训练。我们将使用回调来做到这一点
class MyCustomCallback(tf.keras.callbacks.ModelCheckpoint, keras.callbacks.Callback):
def __init__(self, the_id=0, **args):
self.the_id = the_id
self.epoch = 0
super().__init__(**args)
def _save_model(self, epoch, logs):
logs['the_id'] = self.the_id
super()._save_model(epoch, logs)
def on_batch_end(self, batch, logs={}):
self.the_id += 1
super().on_batch_end(batch, logs)
checkpoint_filepath = 'checkpoint-{the_id}'
model_checkpoint_callback = MyCustomCallback(
filepath=checkpoint_filepath,
save_freq=2,
save_best_only=False)
model = get_model()
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)
model.fit(train_dataset, epochs=5, verbose=0, callbacks=[model_checkpoint_callback], workers=4, shuffle=False)
输出:
[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
我们每两批检查一次。所以让我们假设它崩溃并且最后一个检查点是checkpoint-4。我们可以加载这个模型并将我们的数据集转发到 4 并继续训练。
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)
model = get_model()
model.fit(train_dataset, epochs=2, verbose=0, workers=4, shuffle=False)
输出:
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]