分布式 tensorflow 学习笔记(非最终版)
SO_How does ps work in distribute Tensorflow?
_A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph. A cluster can also be divided into one or more "jobs", where each job contains one or more tasks._
创建 一个 tf.train.ClusterSpec
cluster通过一个dictionary将job(task) map到网络地址,我们将这个地址传递给 tf.train.ClusterSpec 构造器,看如下例子:
tf.train.server包含一些local devices,以及他们和tf.train.clusterspec中描述的其他tasks的connections,还有一个tf.session来实现分布式计算(的交互)
# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
In the above example, the variables are created on two tasks in the ps
job, and the compute-intensive part of the model is created in the worker
job. TensorFlow will insert the appropriate data transfers between the jobs (from ps
to worker
for the forward pass, and from worker
to ps
for applying gradients).
在上面的代码中,变量被创建在了ps 上的两个task,模型的计算密集不问被创建在了worker上 。
tensorflow会插入合适的传输(组件)来(自动地)保证ps和worker的通信(fordward pass时从ps到worker,apply gradients时从worker到ps)
Anyway, both "worker" and "ps" are _tasks_ (or _jobs_ rather, which are just groups of _tasks_), so they're really no different. The difference is what they're supposed to be used for. The idea is that state (e.g. `tf.Variable`) should be on the parameter servers, while operations for calculating state should be on the workers. Instead of achieving this by calling `tf.device` manually everywhere, a helper function named [`tf.train.replica_device_setter`](https://www.tensorflow.org/api_docs/python/train/distributed_execution#replica_device_setter) is used that sets a `tf.Variable`'s device to a parameter server, and the other operations to a worker.
just means that parameter servers will wait on the workers, instead of terminating their client processes immediately.with tf.device(tf.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster_spec)): v1 = tf.Variable(...) # Automatically assigned to a parameter server. train_op = ... # Automatically assigned to the worker.
和我之前的理解基本一致,不过重点是里面提到了名字叫 tf.train.replica_device_setter 的函数,推荐使用这个函数去指定分布式设备,而不是手动指定。
Replicated training
一种常见的训练方式叫做“数据并行”,涉及到在一个worker中的多个tasks在不同的mini-batch数据上训练同一个模型,然后更新存放在ps中的共享参数。在tensorflow中完成这样的结构有很多方法,我们构建了一些库去简化replicated trainning的操作,可能的方法如下:
* **In-graph replication.** In this approach, the client builds a single `tf.Graph` that contains one set of parameters (in `tf.Variable` nodes pinned to `/job:ps`); and multiple copies of the compute-intensive part of the model, each pinned to a different task in `/job:worker`.
* **Between-graph replication.** In this approach, there is a separate client for each `/job:worker` task, typically in the same process as the worker task. Each client builds a similar graph containing the parameters (pinned to `/job:ps` as before using [`tf.train.replica_device_setter`](https://www.tensorflow.org/api_docs/python/tf/train/replica_device_setter) to map them deterministically to the same tasks); and a single copy of the compute-intensive part of the model, pinned to the local task in `/job:worker`.
* **Asynchronous training.** In this approach, each replica of the graph has an independent training loop that executes without coordination. It is compatible with both forms of replication above.
* **Synchronous training.** In this approach, all of the replicas read the same values for the current parameters, compute gradients in parallel, and then apply them together. It is compatible with in-graph replication (e.g. using gradient averaging as in the [CIFAR-10 multi-GPU trainer](https://github.com/tensorflow/models/tree/master/tutorials/image/cifar10/cifar10_multi_gpu_train.py)), and between-graph replication (e.g. using the [`tf.train.SyncReplicasOptimizer`](https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer)).