【问题标题】:Tensorflow : Using Queue for CSV file with custom Estimator and "input_fn" functionTensorflow:使用自定义 Estimator 和“input_fn”函数的 CSV 文件队列
【发布时间】:2017-08-30 15:56:12
【问题描述】:

我在很长一段时间(几个小时)内搜索了我的问题的正确答案,但没有结果,所以我来了。我想我遗漏了一些明显的东西,但我不知道是什么......

问题:使用队列读取 CSV 文件并使用 input_fn 训练 Estimator,而无需每次都重新加载 Graph(这非常慢)。


我创建了一个自定义模型,它为我提供了一个 model_fn 函数来创建我自己的估算器:

tf.estimator.Estimator(model_fn=model_fn, params=model_params)

之后,我需要读取一个非常大的 CSV 文件(无法加载到内存中),所以我决定使用 Queue(似乎是最好的解决方案):

nb_features = 10
queue = tf.train.string_input_producer(["test.csv"],
                                       shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(queue)

record_defaults = [[0] for _ in range(nb_features+1)]
cols = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack(cols[0:len(cols)-1]) # Take all columns without the last
label = tf.stack(cols[len(cols)-1]) # Take last column

我觉得这段代码没问题。


然后,主要代码:

with tf.Session() as sess:
    tf.logging.set_verbosity(tf.logging.INFO)
    sess.run(tf.global_variables_initializer())

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # Return a Tensor of 1000 features/labels
    def get_inputs():
        print("input call !")
        xs = []
        ys = []
        for i in range(1000):
            x, y = sess.run([features, label])
            xs.append(x)
            ys.append(y)
        return tf.constant(np.asarray(xs), dtype=tf.float32), tf.constant(np.asarray(ys))

    estimator.train(input_fn=get_inputs,
                   steps=100)

    coord.request_stop()
    coord.join(threads)

如你所见,这里有很多丑陋的东西......

我想要什么:我希望 train 函数在每个步骤中使用一批新的特征。但是在这里,它在 100 个步骤中使用了相同批次的 1000 个特征,因为 get_inputs 函数只是在我们开始训练时调用。有没有简单的方法来做到这一点?

我尝试使用 step=1 循环 estimator.train,但这每次都会重新加载图表并且变得非常慢。

我现在不知道该怎么做,也不知道有没有可能..

谢谢你帮助我!

【问题讨论】:

    标签: python-3.x csv machine-learning tensorflow


    【解决方案1】:

    短版:将您的 CSV 文件转换为 tfrecords,然后使用 tf.contrib.data.TFRecordDataset。长版:见代码见问题/接受的答案here(为方便起见,复制如下)。


    查看tf.contrib.data.Dataset API。我怀疑您最好将 CSV 转换为 TfRecord 文件并使用 TfRecordDataset。这里有详尽的教程。

    第1步:将csv数据转换为tfrecords数据。示例代码如下。

    import tensorflow as tf
    
    
    def read_csv(filename):
        with open(filename, 'r') as f:
            out = [line.rstrip().split(',') for line in f.readlines()]
        return out
    
    
    csv = read_csv('data.csv')
    with tf.python_io.TFRecordWriter("data.tfrecords") as writer:
        for row in csv:
            features, label = row[:-1], row[-1]
            features = [float(f) for f in features]
            label = int(label)
            example = tf.train.Example()
            example.features.feature[
                "features"].float_list.value.extend(features)
            example.features.feature[
                "label"].int64_list.value.append(label)
            writer.write(example.SerializeToString())
    

    这假设标签是最后一列中的整数,前一列中具有浮点特征。这只需要运行一次。

    第 2 步:编写一个数据集来解码这些记录文件。

    def parse_function(example_proto):
        features = {
            'features': tf.FixedLenFeature((n_features,), tf.float32),
            'label': tf.FixedLenFeature((), tf.int64)
        }
        parsed_features = tf.parse_single_example(example_proto, features)
        return parsed_features['features'], parsed_features['label']
    
    
    def input_fn():
        dataset = tf.contrib.data.TFRecordDataset(['data.tfrecords'])
        dataset = dataset.map(parse_function)
        dataset = dataset.shuffle(shuffle_size)
        dataset = dataset.repeat()  # repeat indefinitely
        dataset = dataset.batch(batch_size)
        print(dataset.output_shapes)
        features, label = dataset.make_one_shot_iterator().get_next()
        return features, label
    

    测试(独立于估计器):

    batch_size = 4
    shuffle_size = 10000
    features, labels = input_fn()
    with tf.Session() as sess:
        f_data, l_data = sess.run([features, labels])
    print(f_data, l_data)
    

    用于 tf.estimator.Estimator:

    estimator.train(input_fn, max_steps=1e7)
    

    【讨论】:

    • 您好,感谢您的回答。我已经看过你的帖子了,但是这个解决方案需要将 CSV 转换为 TFRecord,我想知道我们是否不能避免只使用队列和批处理功能。
    • 你可以......但我很确定这就是数据集在幕后所做的一切。您可以在训练时直接从 csv 读取并解析字符串数据,但速度会慢一些。我了解 tfrecords 的唯一缺点是它们占用了额外的空间。这真的是一个问题吗?如果您正在加载图像,您可以将路径保存在 tfrecords 中,然后正常从文件中读取。
    • 不,在TFrecord中复制并不重要,这只是我个人的知识(因为我在这个问题上浪费了很多时间哈哈)。您是否有任何想法从 CSV 进行而不进行转换?使用您的解决方案,所有数据集都没有加载到内存中吗?只是当前的批次,对吧?我明天试试这个
    • 肯定有办法,但我非常怀疑这是否是最好的办法。我花了很长时间尝试创建一个按照我想要的方式工作的数据管道,然后我才意识到更大的团队中有更聪明的人做得更好,并考虑了更多功能的边缘案例:)。除此之外,还有比数据io更有趣的问题
    • 好吧好吧,你赢了。我也厌倦了这个。再次感谢:)
    【解决方案2】:

    如果您担心tf.train.start_queue_runners 未被调用,请尝试以下操作:

    class ThreadStartHook(tf.train.SessionRunHook):
        def after_create_session(self, session, coord):
            self.coord = coord
            self.threads = tf.train.start_queue_runners(coord=coord, sess=session)
    
        def end(self, session):
            self.coord.request_stop()
            self.coord.join(self.threads)
    
    
    estimator.train(input_fn, [ThreadStartHook()])
    

    刚开始的时候也有类似的想法,后来发现没必要。

    【讨论】:

    • 好的!我认为这段代码可以解决问题。但另一个出现......(否则它不好玩是啊)。 “通过的 Tensor("sparse_softmax_cross_entropy_loss/value:0", shape=(), dtype=float32) 应该具有等于当前图的图属性”。我现在会调查这个错误。
    • 好吧,我明白了……其实我们需要在 tf.train.batch 之后调用“start_queue_runners”,实习生队列不会启动! (经过测试和批准)
    猜你喜欢
    • 1970-01-01
    • 2019-01-06
    • 2018-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-28
    • 2018-09-25
    相关资源
    最近更新 更多