【问题标题】:tf.data: Combining multiple from_generator() datasets to create batches padded across time windowstf.data:组合多个 from_generator() 数据集以创建跨时间窗口填充的批次
【发布时间】:2018-11-09 20:52:32
【问题描述】:

我正在研究一个时间序列问题,其中每个时间序列都相当长(10^3-10^4 时间步长,每个时间序列的长度不同)。

对于每个序列,我可以定义一个 Python 生成器,它一次生成一个时间步长的值。我正在使用tf.data.Dataset.from_generator() 构造函数将这些生成器包装到 tf.data API 中。文档建议使用 from_generator()tf.contrib.data.parallel_interleave() 转换来并行从我的 Python 生成器中提取。

我对这些数据的下游使用是有状态的 RNN(例如 LSTM 或 GRU)。我想将时间序列分成更小的(~10^2)窗口,并将每个块用作训练示例(即截断的 BPTT)。由于我的数据是流式传输的,我认为这意味着在将每个生成器通过管道传递之前保存 window_size 时间步长,以便与其他生成器的数据进行批处理。我还想跨这些块保存 RNN 状态,以便我仍然可以学习长期依赖关系。

我的问题是想要创建这些生成器的批量输出的填充批次。理想情况下,我想向我的神经网络展示生成器输出的窗口,并在生成器的某些子集先于其他子集耗尽自身时根据需要进行填充。我知道,如果我为每个生成器消耗整个生成器输出,那么使用Dataset.padded_batch() 我可以做到这一点(然后可以根据需要将填充的批次在时间维度上分割成窗口块)。但是,我想将每个窗口传递给它变得可用的神经网络。如果其中一个生成器先于其他生成器耗尽,我想用填充值填充它,直到所有其他生成器都具有,所以我可以重置 RNN 状态并以空的初始 RNN 状态开始下一批生成器。我被困在这里是因为tf.contrib.data.parallel_interleave() 转换产生的数据集在耗尽时会丢弃每个生成器,并且时间序列不会在其中的样本之间保持一致的顺序。

这是一个小例子:

import tensorflow as tf

def stepwise_generator(length):
    for i in range(length):
        yield i

lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]

window_length = 4
batch_size = 3

dataset = tf.data.Dataset.from_tensor_slices(lengths)

gen = lambda length: tf.data.Dataset.from_generator(
    stepwise_generator, tf.float32, output_shapes=[], args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator

dataset = dataset.apply(
    tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)

dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)

iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()

outs = []
with tf.Session() as sess:
    while True:
        try:
            out = sess.run(tensor)
            outs.append(out)
        except tf.errors.OutOfRangeError:
            break

print(np.asarray(outs))

输出:

[[[ 0. inf inf inf]   # batch 1
  [ 0.  1.  2. inf]
  [ 0.  1.  2.  3.]]

 [[ 4. inf inf inf]   # batch 2 - the generator in index -1 in the
  [ 0.  1.  2.  3.]   # previous batch gets cycled to index 0 and two
  [ 0.  1.  2.  3.]]  # new generators are initiated

 [[ 4.  5.  6. inf]   # batch 3 - more generator cycling, and the one in
  [ 4.  5.  6.  7.]   # index 1 also gets cycled to index 2 in the same
  [ 8. inf inf inf]]] # batch (because we have run out of generators in
                      # parallel_interleave)

我想要的输出是这样的

[[[ 0. inf inf inf]   # batch 1
  [ 0.  1.  2. inf]
  [ 0.  1.  2.  3.]]

 [[inf]               # batch 2 - the leftover timestep from a padded 
  [inf]               # batch of the first 3 generators
  [4. ]]

 [[ 0.  1.  2.  3.]   # batch 3 - only two generators are left so this is 
  [ 0.  1.  2.  3.]]  # an end-of-epoch smaller batch

 [[ 4.  5.  6. inf]   # batch 4
  [ 4.  5.  6.  7.]]

 [[inf]               # batch 5
  [ 8.]]]

这里,RNN 的内部状态将在第 2 批和第 5 批之后重置。

同样,如果我使用每个生成器的全部输出,然后是填充、批处理和切片,则可以很容易地创建所需的输出,但是我想生成批处理作为生成器,这可能是每个生成器实际接收数据-时间从例如单独的模拟,使它们可用。

【问题讨论】:

    标签: python tensorflow tensorflow-datasets


    【解决方案1】:

    TensorFlow 中的有状态 RNN 需要固定批量大小,因此您想要的输出将不起作用:批量大小从 3 变为 2。

    所以你需要有这样的东西:

    [[[ 0. inf inf inf]   # batch 1
      [ 0.  1.  2. inf]
      [ 0.  1.  2.  3.]]
    
     [[inf]               # batch 2 - the leftover timestep from a padded 
      [inf]               # batch of the first 3 generators
      [4. ]]
    
     [[ 0.  1.  2.  3.]   # batch 3 - only two generators are left
      [ 0.  1.  2.  3.]   # but we still need the same batch size
      [ inf inf inf inf]] # so this row of `inf` is needed
    
     [[ 4.  5.  6. inf]   # batch 4
      [ 4.  5.  6.  7.]
      [ inf inf inf inf]]
    
     [[inf]               # batch 5
      [ 8.]
      [inf]]]
    

    我不相信使用您的 interleave + padded_batch 方法可以做到这一点。

    但是,一种可行的方法是将所有序列填充到相同的长度。这是一个使用 TensorFlow 2.4.1 的工作示例(它应该适用于其他 TF 2 版本):

    import tensorflow as tf
    import numpy as np
    
    lengths = list(range(1,12,2)) # [1, 3, 5, 7, 9, 11]
    max_length = max(lengths)
    
    def stepwise_generator(length):
        for i in range(max_length):
            if i < length:
                yield float(i)
            else:
                yield np.inf
    
    window_length = 4
    batch_size = 3
    
    dataset = tf.data.Dataset.from_tensor_slices(lengths)
    
    gen = lambda length: tf.data.Dataset.from_generator(
        stepwise_generator, tf.float32, args=(length,)
    ).batch(window_length) # this batching saves window_length timesteps per generator
    
    dataset = dataset.interleave(gen, cycle_length=batch_size)
    dataset = dataset.batch(batch_size)
    
    for batch in dataset:
        print(batch)
    

    这给出了以下输出:

    tf.Tensor(
    [[ 0. inf inf inf]
     [ 0.  1.  2. inf]
     [ 0.  1.  2.  3.]], shape=(3, 4), dtype=float32)
    tf.Tensor(
    [[inf inf inf inf]
     [inf inf inf inf]
     [ 4. inf inf inf]], shape=(3, 4), dtype=float32)
    tf.Tensor(
    [[inf inf inf]
     [inf inf inf]
     [inf inf inf]], shape=(3, 3), dtype=float32)
    tf.Tensor(
    [[0. 1. 2. 3.]
     [0. 1. 2. 3.]
     [0. 1. 2. 3.]], shape=(3, 4), dtype=float32)
    tf.Tensor(
    [[ 4.  5.  6. inf]
     [ 4.  5.  6.  7.]
     [ 4.  5.  6.  7.]], shape=(3, 4), dtype=float32)
    tf.Tensor(
    [[inf inf inf]
     [ 8. inf inf]
     [ 8.  9. 10.]], shape=(3, 3), dtype=float32)
    

    注意事项:

    • 生成器现在处理填充,所以我们使用batch() 而不是padded_batch()
    • 序列数必须是批量大小的倍数
    • tf.contrib 包已在 TF 2 中删除,但 tf.contrib.data.parallel_interleave() 已提升为核心 API:您现在可以使用 dataset.interleave()
    • 因为这是TF 2,所以不需要dataset.make_one_shot_iterator()iterator.get_next()tf.Session()等,更简单。

    【讨论】:

      猜你喜欢
      • 2021-12-19
      • 2021-07-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-11
      • 2020-10-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多