【问题标题】:FLAGS and parsers in tensorflow distributed training张量流分布式训练中的 FLAGS 和解析器
【发布时间】:2018-10-20 09:26:32
【问题描述】:

所以我试图了解 tensorflow 中的分布式训练。为了练习自己,我尝试了https://github.com/hn826/distributed-tensorflow/blob/master/distributed-deep-mnist.py的代码

import argparse
import sys

from tensorflow.examples.tutorials.mnist import input_data
import tensorflow as tf

FLAGS = None

def deepnn(x):
  x_image = tf.reshape(x, [-1, 28, 28, 1])

  W_conv1 = weight_variable([5, 5, 1, 32])
  b_conv1 = bias_variable([32])
  h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

  h_pool1 = max_pool_2x2(h_conv1)

  W_conv2 = weight_variable([5, 5, 32, 64])
  b_conv2 = bias_variable([64])
  h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

  h_pool2 = max_pool_2x2(h_conv2)

  W_fc1 = weight_variable([7 * 7 * 64, 1024])
  b_fc1 = bias_variable([1024])

  h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
  h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

  keep_prob = tf.placeholder(tf.float32)
  h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

  W_fc2 = weight_variable([1024, 10])
  b_fc2 = bias_variable([10])

  y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
  return y_conv, keep_prob

def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='SAME')

def weight_variable(shape):
  initial = tf.truncated_normal(shape, stddev=0.1)
  return tf.Variable(initial)

def bias_variable(shape):
  initial = tf.constant(0.1, shape=shape)
  return tf.Variable(initial)

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Import data
      mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

      # Build Deep MNIST model...
      x = tf.placeholder(tf.float32, [None, 784])
      y_ = tf.placeholder(tf.float32, [None, 10])
      y_conv, keep_prob = deepnn(x)

      cross_entropy = tf.reduce_mean(
          tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y_conv))

      global_step = tf.contrib.framework.get_or_create_global_step()

      train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy, global_step=global_step)
      correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
      accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    # The StopAtStepHook handles stopping after running given steps.
    hooks=[tf.train.StopAtStepHook(last_step=1000)]

    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir=FLAGS.log_dir,
                                           hooks=hooks) as mon_sess:
      i = 0
      while not mon_sess.should_stop():
        # Run a training step asynchronously.
        batch = mnist.train.next_batch(50)
        if i % 100 == 0:
          train_accuracy = mon_sess.run(accuracy, feed_dict={
              x: batch[0], y_: batch[1], keep_prob: 1.0})
          print('global_step %s, task:%d_step %d, training accuracy %g'
                % (tf.train.global_step(mon_sess, global_step), FLAGS.task_index, i, train_accuracy))
        mon_sess.run(train_step, feed_dict={x: batch[0], y_: batch[1], keep_prob: 0.5})
        i = i + 1

if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.register("type", "bool", lambda v: v.lower() == "true")
  # Flags for defining the tf.train.ClusterSpec
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  # Flags for specifying input/output directories
  parser.add_argument(
      "--data_dir",
      type=str,
      default="/tmp/mnist_data",
      help="Directory for storing input data")
  parser.add_argument(
      "--log_dir",
      type=str,
      default="/tmp/train_logs",
      help="Directory for train logs")
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

除了一些概念外,我已经了解了大部分内容。
首先,关于FLAGS。据我了解,任务和工人都在其中定义。但我很困惑如何。

其次,关于解析器。它们是什么,为什么我们在这里使用它们?我已经意识到,在终端中运行代码时,parser.add_argument() 会为您提供选项。

我猜parserFLAGS 是有某种联系的。所以知道他们做了什么,可能会赶走我脑海中的所有问号。

【问题讨论】:

    标签: python parsing tensorflow distributed-computing argparse


    【解决方案1】:

    首先,关于FLAGS。据我了解,任务和工人都在其中定义。但我很困惑如何。

    是的,这是在分布式设置中运行 tensorflow 的标准方式(您的特殊情况是 Between-Graph Replication 策略)。基本上,相同的脚本会启动不同的节点(worker、参数服务器等),它们一起执行训练。 This tutorial 讨论了 tensorflow 中的各种策略,并很好地解释了它是如何转化为代码的。

    这是一个如何使用此脚本的示例。启动 4 个进程(2 个 ps 服务器和 2 个 worker):

    # On ps0.example.com:
    $ python trainer.py \
         --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
         --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
         --job_name=ps --task_index=0
    # On ps1.example.com:
    $ python trainer.py \
         --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
         --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
         --job_name=ps --task_index=1
    # On worker0.example.com:
    $ python trainer.py \
         --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
         --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
         --job_name=worker --task_index=0
    # On worker1.example.com:
    $ python trainer.py \
         --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
         --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
         --job_name=worker --task_index=1
    

    其次,关于解析器。它们是什么,为什么我们在这里使用它们?

    这是处理命令行参数的python方式:argparse。不同的选项允许为每个参数指定类型和边界(从而定义验证器)、分配操作等等(查看可用功能的文档)。然后解析器获取命令行字符串,只需一次调用即可神奇地设置变量:

    FLAGS, unparsed = parser.parse_known_args()
    

    【讨论】:

    • 感谢十亿,非常直观!还是有点困惑。当我们将任务分配给工作人员时,with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):FLAGS.task_index 一个列表吗?如果FLAGS.task_index 包含多个值,这是否意味着我们同时创建了所有工人?还是FLAGS 中的值在图中迭代?例如if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": 这里,看着它让我说我们遍历了所有job_names
    • @Huzo task_index 是一个整数(参见标志定义),因此每个进程有一个任务。但是您可以运行多个进程,具有不同的任务索引和作业名称。我链接的教程中有一个示例。
    • 啊,我现在明白了。我一直认为程序存储了所有的终端输入,然后将其作为一个整体提供给FLAGS。但是每次输入命令时它都会执行它。非常感谢
    猜你喜欢
    • 2020-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-28
    • 1970-01-01
    • 2018-11-05
    相关资源
    最近更新 更多