【发布时间】:2018-11-09 20:52:32
【问题描述】:
我正在研究一个时间序列问题,其中每个时间序列都相当长(10^3-10^4 时间步长,每个时间序列的长度不同)。
对于每个序列,我可以定义一个 Python 生成器,它一次生成一个时间步长的值。我正在使用tf.data.Dataset.from_generator() 构造函数将这些生成器包装到 tf.data API 中。文档建议使用 from_generator() 和 tf.contrib.data.parallel_interleave() 转换来并行从我的 Python 生成器中提取。
我对这些数据的下游使用是有状态的 RNN(例如 LSTM 或 GRU)。我想将时间序列分成更小的(~10^2)窗口,并将每个块用作训练示例(即截断的 BPTT)。由于我的数据是流式传输的,我认为这意味着在将每个生成器通过管道传递之前保存 window_size 时间步长,以便与其他生成器的数据进行批处理。我还想跨这些块保存 RNN 状态,以便我仍然可以学习长期依赖关系。
我的问题是想要创建这些生成器的批量输出的填充批次。理想情况下,我想向我的神经网络展示生成器输出的窗口,并在生成器的某些子集先于其他子集耗尽自身时根据需要进行填充。我知道,如果我为每个生成器消耗整个生成器输出,那么使用Dataset.padded_batch() 我可以做到这一点(然后可以根据需要将填充的批次在时间维度上分割成窗口块)。但是,我想将每个窗口传递给它变得可用的神经网络。如果其中一个生成器先于其他生成器耗尽,我想用填充值填充它,直到所有其他生成器都具有,所以我可以重置 RNN 状态并以空的初始 RNN 状态开始下一批生成器。我被困在这里是因为tf.contrib.data.parallel_interleave() 转换产生的数据集在耗尽时会丢弃每个生成器,并且时间序列不会在其中的样本之间保持一致的顺序。
这是一个小例子:
import tensorflow as tf
def stepwise_generator(length):
for i in range(length):
yield i
lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]
window_length = 4
batch_size = 3
dataset = tf.data.Dataset.from_tensor_slices(lengths)
gen = lambda length: tf.data.Dataset.from_generator(
stepwise_generator, tf.float32, output_shapes=[], args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator
dataset = dataset.apply(
tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)
dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
outs = []
with tf.Session() as sess:
while True:
try:
out = sess.run(tensor)
outs.append(out)
except tf.errors.OutOfRangeError:
break
print(np.asarray(outs))
输出:
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
[ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
[ 0. 1. 2. 3.]] # new generators are initiated
[[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
[ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
[ 8. inf inf inf]]] # batch (because we have run out of generators in
# parallel_interleave)
我想要的输出是这样的
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[inf] # batch 2 - the leftover timestep from a padded
[inf] # batch of the first 3 generators
[4. ]]
[[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
[ 0. 1. 2. 3.]] # an end-of-epoch smaller batch
[[ 4. 5. 6. inf] # batch 4
[ 4. 5. 6. 7.]]
[[inf] # batch 5
[ 8.]]]
这里,RNN 的内部状态将在第 2 批和第 5 批之后重置。
同样,如果我使用每个生成器的全部输出,然后是填充、批处理和切片,则可以很容易地创建所需的输出,但是我想生成批处理作为生成器,这可能是每个生成器实际接收数据-时间从例如单独的模拟,使它们可用。
【问题讨论】:
标签: python tensorflow tensorflow-datasets