【问题标题】:Running multiple tensorflow sessions concurrently同时运行多个 tensorflow 会话
【发布时间】:2015-11-17 13:52:07
【问题描述】:

我正在尝试在具有 64 个 CPU 的 CentOS 7 机器上同时运行多个 TensorFlow 会话。我的同事报告说,他可以使用以下两个代码块在他的机器上使用 4 核产生并行加速:

mnist.py

import numpy as np
import input_data
from PIL import Image
import tensorflow as tf
import time


def main(randint):
    print 'Set new seed:', randint
    np.random.seed(randint)
    tf.set_random_seed(randint)
    mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)

    # Setting up the softmax architecture
    x = tf.placeholder("float", [None, 784])
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)

    # Setting up the cost function
    y_ = tf.placeholder("float", [None, 10])
    cross_entropy = -tf.reduce_sum(y_*tf.log(y))
    train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)

    # Initialization 
    init = tf.initialize_all_variables()
    sess = tf.Session(
        config=tf.ConfigProto(
            inter_op_parallelism_threads=1,
            intra_op_parallelism_threads=1
        )
    )
    sess.run(init)

    for i in range(1000):
        batch_xs, batch_ys = mnist.train.next_batch(100)
        sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})

    correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))

    print sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels})

if __name__ == "__main__":
    t1 = time.time()
    main(0)
    t2 = time.time()
    print "time spent: {0:.2f}".format(t2 - t1)

parallel.py

import multiprocessing
import numpy as np

import mnist
import time

t1 = time.time()
p1 = multiprocessing.Process(target=mnist.main,args=(np.random.randint(10000000),))
p2 = multiprocessing.Process(target=mnist.main,args=(np.random.randint(10000000),))
p3 = multiprocessing.Process(target=mnist.main,args=(np.random.randint(10000000),))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
t2 = time.time()
print "time spent: {0:.2f}".format(t2 - t1)

特别是,他说他观察到了

Running a single process took: 39.54 seconds
Running three processes took: 54.16 seconds

但是,当我运行代码时:

python mnist.py
==> Time spent: 5.14

python parallel.py 
==> Time spent: 37.65

如您所见,我使用多处理显着降低了速度,而我的同事却没有。有没有人知道为什么会发生这种情况以及可以做些什么来解决它?

编辑

这是一些示例输出。请注意,加载数据似乎是并行发生的,但训练单个模型在输出中具有非常顺序的外观(可以通过在程序执行时查看top 中的 CPU 使用情况来验证)

#$ python parallel.py 
Set new seed: 9672406
Extracting MNIST_data/train-images-idx3-ubyte.gz
Set new seed: 4790824
Extracting MNIST_data/train-images-idx3-ubyte.gz
Set new seed: 8011659
Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
I tensorflow/core/common_runtime/local_device.cc:25] Local device intra op parallelism threads: 1
I tensorflow/core/common_runtime/local_session.cc:45] Local session inter op parallelism threads: 1
0.9136
I tensorflow/core/common_runtime/local_device.cc:25] Local device intra op parallelism threads: 1
I tensorflow/core/common_runtime/local_session.cc:45] Local session inter op parallelism threads: 1
0.9149
I tensorflow/core/common_runtime/local_device.cc:25] Local device intra op parallelism threads: 1
I tensorflow/core/common_runtime/local_session.cc:45] Local session inter op parallelism threads: 1
0.8931
time spent: 41.36

另一个编辑

假设我们希望确认问题似乎与 TensorFlow 有关,而不是与多处理有关。我用大循环替换了mnist.py的内容如下:

def main(randint):
    c = 0
    for i in xrange(100000000):
        c += i

输出:

#$ python mnist.py
==> time spent: 5.16
#$ python parallel.py 
==> time spent: 4.86

因此我认为这里的问题不在于多处理本身。

【问题讨论】:

  • 你在使用 docker 吗?我必须让它访问我所有的 cpus
  • 不,我没有使用 Docker

标签: python parallel-processing python-multiprocessing tensorflow


【解决方案1】:

来自 OP (user1936768) 的评论:

我有个好消息:事实证明,至少在我的系统上,我的试用程序执行的时间不够长,以至于其他 TF 实例无法启动。当我在 main 中放置一个运行时间较长的示例程序时,我确实看到了并发计算

【讨论】:

    【解决方案2】:

    一种可能是您的会话尝试使用 64 个核心并相互踩踏 也许尝试为每个会话将NUM_CORES 设置为较低的值

    sess = tf.Session(
        tf.ConfigProto(inter_op_parallelism_threads=NUM_CORES,
                       intra_op_parallelism_threads=NUM_CORES))
    

    【讨论】:

    • 在上面的代码中(mnist.py)不是我把这两个数量都设置为一个吗?
    • 确实,很抱歉。当您运行程序时,您的 CPU 利用率如何?
    • 看起来三个python程序依次执行(而不是并行),一个接一个,在下一个程序的开始和前一个程序的结束之间只有一点延迟。跨度>
    • 我在原始帖子中添加了一些示例输出
    • 这是 TensorFlow 还是多处理中的问题? IE,如果你只是使用一些长时间运行的循环作为函数的主体而不是 TF
    【解决方案3】:

    这可以通过Ray 优雅地完成,它是一个用于并行和分布式 Python 的库,可让您从单个 Python 脚本并行训练模型。

    这样做的好处是让您可以通过将“类”转换为“参与者”来并行化它们,这对于常规的 Python 多处理来说很难做到。这很重要,因为经常初始化 TensorFlow 图是昂贵的部分。如果您创建一个actor,然后多次调用train 方法,则初始化图的成本将被摊销。

    import numpy as np
    from tensorflow.examples.tutorials.mnist import input_data
    from PIL import Image
    import ray
    import tensorflow as tf
    import time
    
    
    @ray.remote
    class TrainingActor(object):
        def __init__(self, seed):
            print('Set new seed:', seed)
            np.random.seed(seed)
            tf.set_random_seed(seed)
            self.mnist = input_data.read_data_sets('MNIST_data/', one_hot=True)
    
            # Setting up the softmax architecture.
            self.x = tf.placeholder('float', [None, 784])
            W = tf.Variable(tf.zeros([784, 10]))
            b = tf.Variable(tf.zeros([10]))
            self.y = tf.nn.softmax(tf.matmul(self.x, W) + b)
    
            # Setting up the cost function.
            self.y_ = tf.placeholder('float', [None, 10])
            cross_entropy = -tf.reduce_sum(self.y_*tf.log(self.y))
            self.train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
    
            # Initialization
            self.init = tf.initialize_all_variables()
            self.sess = tf.Session(
                config=tf.ConfigProto(
                    inter_op_parallelism_threads=1,
                    intra_op_parallelism_threads=1
                )
            )
    
        def train(self):
            self.sess.run(self.init)
    
            for i in range(1000):
                batch_xs, batch_ys = self.mnist.train.next_batch(100)
                self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})
    
            correct_prediction = tf.equal(tf.argmax(self.y, 1), tf.argmax(self.y_, 1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
    
            return self.sess.run(accuracy, feed_dict={self.x: self.mnist.test.images,
                                                      self.y_: self.mnist.test.labels})
    
    
    if __name__ == '__main__':
        # Start Ray.
        ray.init()
    
        # Create 3 actors.
        training_actors = [TrainingActor.remote(seed) for seed in range(3)]
    
        # Make them all train in parallel.
        accuracy_ids = [actor.train.remote() for actor in training_actors]
        print(ray.get(accuracy_ids))
    
        # Start new training runs in parallel.
        accuracy_ids = [actor.train.remote() for actor in training_actors]
        print(ray.get(accuracy_ids))
    

    如果您只想创建数据集的一个副本,而不是让每个参与者读取数据集,您可以按如下方式重写。在后台,这使用了Plasma shared memory object storeApache Arrow data format

    @ray.remote
    class TrainingActor(object):
        def __init__(self, mnist, seed):
            self.mnist = mnist
            ...
    
        ...
    
    if __name__ == "__main__":
        ray.init()
    
        # Read the mnist dataset and put it into shared memory once
        # so that workers don't create their own copies.
        mnist = input_data.read_data_sets('MNIST_data/', one_hot=True)
        mnist_id = ray.put(mnist)
    
        training_actors = [TrainingActor.remote(mnist_id, seed) for seed in range(3)]
    

    您可以在Ray documentation 中查看更多信息。请注意,我是 Ray 开发人员之一。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-03-18
      • 2019-06-08
      • 2020-01-04
      • 2019-08-02
      • 1970-01-01
      • 2016-04-18
      • 1970-01-01
      相关资源
      最近更新 更多