tensorflow input pipline 学习笔记

参考资料:

tf_doc_Reading data

TENSORFLOW INPUT PIPELINE EXAMPLE

tensorflow:理解tensorflow中的输入管道

第二个参考资料是第一个的翻译版本,翻译的水平一般,建议看原文,不是很长。

下面是我挑了文章中重点的部分+自己的理解。

TL;DR;

一个适用于不是很大的数据集的pipline input 的例子。

Load Data in Tensorflow

input pipline 可以理解称一种load data的方式。 一般有两种方式load data,一种是比较传统的,使用feed 的方式。如果数据集比较大,这种方式就不适用了,因为这种方式需要将数据全部导入到memory中。因此tf提供了pipline input的读入数据的方式。

input pipline 会处理 csv file,解码文件格式,重构数据结构,打乱数据顺序,做数据扩充或者其他预处理,然后使用线程(threads)将数据导进batch.

Load the Label Data

确保使用正确的dataset,csv文件路径。

然后处理 得到train和test 的label

由于我们只是读数据而没有真的打算训练,所以没有使用one-hot的编码方式,而是直接将(本来也是由数字字符组成的)字符串,转化成int.

 1def encode_label(label):
 2  return int(label)
 3
 4def read_label_file(file):
 5  f = open(file, "r")
 6  filepaths = []
 7  labels = []
 8  for line in f:
 9    filepath, label = line.split(",")
10    filepaths.append(filepath)
11    labels.append(encode_label(label))
12  return filepaths, labels
13
14# reading labels and file path
15train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file)
16test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file)

Do Some Optional Processing on Our String Lists

1# transform relative path into full path
2train_filepaths = [ dataset_path + fp for fp in train_filepaths]
3test_filepaths = [ dataset_path + fp for fp in test_filepaths]
1# for this example we will create or own test partition
2all_filepaths = train_filepaths + test_filepaths
3all_labels = train_labels + test_labels
1# we limit the number of files to 20 to make the output more clear!
2all_filepaths = all_filepaths[:20]
3all_labels = all_labels[:20]

Start Building the Pipeline

确保tensor的 dtype和list中的数据的type相匹配。

1from tensorflow.python.framework import ops
2from tensorflow.python.framework import dtypes
3# convert string into tensors
4all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string)
5all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32)

Lets Partition the Data

这是一个可选的步骤。可能由于我们的数据集比较大,我们先把它分成train set 和 test set

A visualization of the dynamic partition function in tensorflow.

1# create a partition vector
2partitions = [0] * len(all_filepaths)
3partitions[:test_set_size] = [1] * test_set_size
4random.shuffle(partitions)
1# partition our data into a test and train set according to our partition vector
2train_images, test_images = tf.dynamic_partition(all_images, partitions, 2)
3train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2)

Build the Input Queues and Define How to Load Images

使用slice_input_producer 切分 tensor,得到一个个的实例(?),然后使用线程 queue them up.

shuffle表示是否打乱数据,此处我们不打乱。

1# create input queues
2train_input_queue = tf.train.slice_input_producer(
3                                    [train_images, train_labels],
4                                    shuffle=False)
5test_input_queue = tf.train.slice_input_producer(
6                                    [test_images, test_labels],
7                                    shuffle=False)
1# process path and string tensor into an image and a label
2file_content = tf.read_file(train_input_queue[0])
3train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
4train_label = train_input_queue[1]
1file_content = tf.read_file(test_input_queue[0])
2test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
3test_label = test_input_queue[1]

Group Samples into Batches

单个sample训练效率会很低,通常的做法是将若干个samples合成一个batch一起训练。每个batch中samples的个数就是所谓的batch_size

到目前为止我们只是描述了pipline大致的样子,但是tensorflow还不知道我们image数据的shape.  在使用tf.train_batch将samples合成若干个batch之前,我们需要首先定义image

tensor 的 shape

1# define tensor shape
2train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
3test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
 1# collect batches of images before processing
 2train_image_batch, train_label_batch = tf.train.batch(
 3                                    [train_image, train_label],
 4                                    batch_size=BATCH_SIZE
 5                                    #,num_threads=1
 6                                    )
 7test_image_batch, test_label_batch = tf.train.batch(
 8                                    [test_image, test_label],
 9                                    batch_size=BATCH_SIZE
10                                    #,num_threads=1
11                                    )

Run the Queue Runners and Start a Session

到目前为止我们已经构建好了input pipline.但是为了将pipline 启动,我们还需要使用线程,线程将加载queue,将数据导入tensorflow objects.

 1with tf.Session() as sess:
 2
 3  # initialize the variables
 4  sess.run(tf.initialize_all_variables())
 5
 6  # initialize the queue threads to start to shovel data
 7  coord = tf.train.Coordinator()
 8  threads = tf.train.start_queue_runners(coord=coord)
 9
10  print "from the train set:"
11  for i in range(20):
12    print sess.run(train_label_batch)
13
14  print "from the test set:"
15  for i in range(10):
16    print sess.run(test_label_batch)
17
18  # stop our queue threads and properly close the session
19  coord.request_stop()
20  coord.join(threads)
21  sess.close()

如下面的输出所示,tensorflow 不会关心epoch(全部数据过了一遍叫一个epoch)的数值,所以需要你自己手动统计。

 1(tf-env)worker1:~$ python mnist_feed.py 
 2I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
 3I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
 4I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
 5I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
 6I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
 7input pipeline ready
 8I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:900] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
 9I tensorflow/core/common_runtime/gpu/gpu_init.cc:102] Found device 0 with properties: 
10name: GeForce GTX 960
11major: 5 minor: 2 memoryClockRate (GHz) 1.253
12pciBusID 0000:01:00.0
13Total memory: 2.00GiB
14Free memory: 1.77GiB
15I tensorflow/core/common_runtime/gpu/gpu_init.cc:126] DMA: 0 
16I tensorflow/core/common_runtime/gpu/gpu_init.cc:136] 0:   Y 
17I tensorflow/core/common_runtime/gpu/gpu_device.cc:755] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 960, pci bus id: 0000:01:00.0)
18from the train set:
19[5 4 1 9 2]
20[1 3 1 3 6]
21[1 7 2 6 9]
22[5 4 1 9 2]
23[1 3 1 3 6]
24[1 7 2 6 9]
25[5 4 1 9 2]
26[1 3 1 3 6]
27[1 7 2 6 9]
28[5 4 1 9 2]
29[1 3 1 3 6]
30[1 7 2 6 9]
31[5 4 1 9 2]
32[1 3 1 3 6]
33[1 7 2 6 9]
34[5 4 1 9 2]
35[1 3 1 3 6]
36[1 7 2 6 9]
37[5 4 1 9 2]
38[1 3 1 3 6]
39from the test set:
40[0 4 5 3 8]
41[0 4 5 3 8]
42[0 4 5 3 8]
43[0 4 5 3 8]
44[0 4 5 3 8]
45[0 4 5 3 8]
46[0 4 5 3 8]
47[0 4 5 3 8]
48[0 4 5 3 8]
49[0 4 5 3 8]

Complete Code for this example

  1# Example on how to use the tensorflow input pipelines. The explanation can be found here ischlag.github.io.
  2import tensorflow as tf
  3import random
  4from tensorflow.python.framework import ops
  5from tensorflow.python.framework import dtypes
  6
  7dataset_path      = "/path/to/your/dataset/mnist/"
  8test_labels_file  = "test-labels.csv"
  9train_labels_file = "train-labels.csv"
 10
 11test_set_size = 5
 12
 13IMAGE_HEIGHT  = 28
 14IMAGE_WIDTH   = 28
 15NUM_CHANNELS  = 3
 16BATCH_SIZE    = 5
 17
 18def encode_label(label):
 19  return int(label)
 20
 21def read_label_file(file):
 22  f = open(file, "r")
 23  filepaths = []
 24  labels = []
 25  for line in f:
 26    filepath, label = line.split(",")
 27    filepaths.append(filepath)
 28    labels.append(encode_label(label))
 29  return filepaths, labels
 30
 31# reading labels and file path
 32train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file)
 33test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file)
 34
 35# transform relative path into full path
 36train_filepaths = [ dataset_path + fp for fp in train_filepaths]
 37test_filepaths = [ dataset_path + fp for fp in test_filepaths]
 38
 39# for this example we will create or own test partition
 40all_filepaths = train_filepaths + test_filepaths
 41all_labels = train_labels + test_labels
 42
 43all_filepaths = all_filepaths[:20]
 44all_labels = all_labels[:20]
 45
 46# convert string into tensors
 47all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string)
 48all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32)
 49
 50# create a partition vector
 51partitions = [0] * len(all_filepaths)
 52partitions[:test_set_size] = [1] * test_set_size
 53random.shuffle(partitions)
 54
 55# partition our data into a test and train set according to our partition vector
 56train_images, test_images = tf.dynamic_partition(all_images, partitions, 2)
 57train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2)
 58
 59# create input queues
 60train_input_queue = tf.train.slice_input_producer(
 61                                    [train_images, train_labels],
 62                                    shuffle=False)
 63test_input_queue = tf.train.slice_input_producer(
 64                                    [test_images, test_labels],
 65                                    shuffle=False)
 66
 67# process path and string tensor into an image and a label
 68file_content = tf.read_file(train_input_queue[0])
 69train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
 70train_label = train_input_queue[1]
 71
 72file_content = tf.read_file(test_input_queue[0])
 73test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
 74test_label = test_input_queue[1]
 75
 76# define tensor shape
 77train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
 78test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
 79
 80
 81# collect batches of images before processing
 82train_image_batch, train_label_batch = tf.train.batch(
 83                                    [train_image, train_label],
 84                                    batch_size=BATCH_SIZE
 85                                    #,num_threads=1
 86                                    )
 87test_image_batch, test_label_batch = tf.train.batch(
 88                                    [test_image, test_label],
 89                                    batch_size=BATCH_SIZE
 90                                    #,num_threads=1
 91                                    )
 92
 93print "input pipeline ready"
 94
 95with tf.Session() as sess:
 96
 97  # initialize the variables
 98  sess.run(tf.initialize_all_variables())
 99
100  # initialize the queue threads to start to shovel data
101  coord = tf.train.Coordinator()
102  threads = tf.train.start_queue_runners(coord=coord)
103
104  print "from the train set:"
105  for i in range(20):
106    print sess.run(train_label_batch)
107
108  print "from the test set:"
109  for i in range(10):
110    print sess.run(test_label_batch)
111
112  # stop our queue threads and properly close the session
113  coord.request_stop()
114  coord.join(threads)
115  sess.close()