tensorflow input pipline 学习笔记
参考资料:
TENSORFLOW INPUT PIPELINE EXAMPLE
第二个参考资料是第一个的翻译版本,翻译的水平一般,建议看原文,不是很长。
下面是我挑了文章中重点的部分+自己的理解。
TL;DR;
一个适用于不是很大的数据集的pipline input 的例子。
Load Data in Tensorflow
input pipline 可以理解称一种load data的方式。 一般有两种方式load data,一种是比较传统的,使用feed 的方式。如果数据集比较大,这种方式就不适用了,因为这种方式需要将数据全部导入到memory中。因此tf提供了pipline input的读入数据的方式。
input pipline 会处理 csv file,解码文件格式,重构数据结构,打乱数据顺序,做数据扩充或者其他预处理,然后使用线程(threads)将数据导进batch.
Load the Label Data
确保使用正确的dataset,csv文件路径。
然后处理 得到train和test 的label
由于我们只是读数据而没有真的打算训练,所以没有使用one-hot的编码方式,而是直接将(本来也是由数字字符组成的)字符串,转化成int.
1def encode_label(label):
2 return int(label)
3
4def read_label_file(file):
5 f = open(file, "r")
6 filepaths = []
7 labels = []
8 for line in f:
9 filepath, label = line.split(",")
10 filepaths.append(filepath)
11 labels.append(encode_label(label))
12 return filepaths, labels
13
14# reading labels and file path
15train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file)
16test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file)
Do Some Optional Processing on Our String Lists
1# transform relative path into full path
2train_filepaths = [ dataset_path + fp for fp in train_filepaths]
3test_filepaths = [ dataset_path + fp for fp in test_filepaths]
1# for this example we will create or own test partition
2all_filepaths = train_filepaths + test_filepaths
3all_labels = train_labels + test_labels
1# we limit the number of files to 20 to make the output more clear!
2all_filepaths = all_filepaths[:20]
3all_labels = all_labels[:20]
Start Building the Pipeline
确保tensor的 dtype和list中的数据的type相匹配。
1from tensorflow.python.framework import ops
2from tensorflow.python.framework import dtypes
3# convert string into tensors
4all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string)
5all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32)
Lets Partition the Data
这是一个可选的步骤。可能由于我们的数据集比较大,我们先把它分成train set 和 test set

1# create a partition vector
2partitions = [0] * len(all_filepaths)
3partitions[:test_set_size] = [1] * test_set_size
4random.shuffle(partitions)
1# partition our data into a test and train set according to our partition vector
2train_images, test_images = tf.dynamic_partition(all_images, partitions, 2)
3train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2)
Build the Input Queues and Define How to Load Images
使用slice_input_producer 切分 tensor,得到一个个的实例(?),然后使用线程 queue them up.
shuffle表示是否打乱数据,此处我们不打乱。
1# create input queues
2train_input_queue = tf.train.slice_input_producer(
3 [train_images, train_labels],
4 shuffle=False)
5test_input_queue = tf.train.slice_input_producer(
6 [test_images, test_labels],
7 shuffle=False)
1# process path and string tensor into an image and a label
2file_content = tf.read_file(train_input_queue[0])
3train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
4train_label = train_input_queue[1]
1file_content = tf.read_file(test_input_queue[0])
2test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
3test_label = test_input_queue[1]
Group Samples into Batches
单个sample训练效率会很低,通常的做法是将若干个samples合成一个batch一起训练。每个batch中samples的个数就是所谓的batch_size
到目前为止我们只是描述了pipline大致的样子,但是tensorflow还不知道我们image数据的shape. 在使用tf.train_batch将samples合成若干个batch之前,我们需要首先定义image
tensor 的 shape
1# define tensor shape
2train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
3test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
1# collect batches of images before processing
2train_image_batch, train_label_batch = tf.train.batch(
3 [train_image, train_label],
4 batch_size=BATCH_SIZE
5 #,num_threads=1
6 )
7test_image_batch, test_label_batch = tf.train.batch(
8 [test_image, test_label],
9 batch_size=BATCH_SIZE
10 #,num_threads=1
11 )
Run the Queue Runners and Start a Session
到目前为止我们已经构建好了input pipline.但是为了将pipline 启动,我们还需要使用线程,线程将加载queue,将数据导入tensorflow objects.
1with tf.Session() as sess:
2
3 # initialize the variables
4 sess.run(tf.initialize_all_variables())
5
6 # initialize the queue threads to start to shovel data
7 coord = tf.train.Coordinator()
8 threads = tf.train.start_queue_runners(coord=coord)
9
10 print "from the train set:"
11 for i in range(20):
12 print sess.run(train_label_batch)
13
14 print "from the test set:"
15 for i in range(10):
16 print sess.run(test_label_batch)
17
18 # stop our queue threads and properly close the session
19 coord.request_stop()
20 coord.join(threads)
21 sess.close()
如下面的输出所示,tensorflow 不会关心epoch(全部数据过了一遍叫一个epoch)的数值,所以需要你自己手动统计。
1(tf-env)worker1:~$ python mnist_feed.py
2I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
3I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
4I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
5I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
6I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
7input pipeline ready
8I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:900] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
9I tensorflow/core/common_runtime/gpu/gpu_init.cc:102] Found device 0 with properties:
10name: GeForce GTX 960
11major: 5 minor: 2 memoryClockRate (GHz) 1.253
12pciBusID 0000:01:00.0
13Total memory: 2.00GiB
14Free memory: 1.77GiB
15I tensorflow/core/common_runtime/gpu/gpu_init.cc:126] DMA: 0
16I tensorflow/core/common_runtime/gpu/gpu_init.cc:136] 0: Y
17I tensorflow/core/common_runtime/gpu/gpu_device.cc:755] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 960, pci bus id: 0000:01:00.0)
18from the train set:
19[5 4 1 9 2]
20[1 3 1 3 6]
21[1 7 2 6 9]
22[5 4 1 9 2]
23[1 3 1 3 6]
24[1 7 2 6 9]
25[5 4 1 9 2]
26[1 3 1 3 6]
27[1 7 2 6 9]
28[5 4 1 9 2]
29[1 3 1 3 6]
30[1 7 2 6 9]
31[5 4 1 9 2]
32[1 3 1 3 6]
33[1 7 2 6 9]
34[5 4 1 9 2]
35[1 3 1 3 6]
36[1 7 2 6 9]
37[5 4 1 9 2]
38[1 3 1 3 6]
39from the test set:
40[0 4 5 3 8]
41[0 4 5 3 8]
42[0 4 5 3 8]
43[0 4 5 3 8]
44[0 4 5 3 8]
45[0 4 5 3 8]
46[0 4 5 3 8]
47[0 4 5 3 8]
48[0 4 5 3 8]
49[0 4 5 3 8]
Complete Code for this example
1# Example on how to use the tensorflow input pipelines. The explanation can be found here ischlag.github.io.
2import tensorflow as tf
3import random
4from tensorflow.python.framework import ops
5from tensorflow.python.framework import dtypes
6
7dataset_path = "/path/to/your/dataset/mnist/"
8test_labels_file = "test-labels.csv"
9train_labels_file = "train-labels.csv"
10
11test_set_size = 5
12
13IMAGE_HEIGHT = 28
14IMAGE_WIDTH = 28
15NUM_CHANNELS = 3
16BATCH_SIZE = 5
17
18def encode_label(label):
19 return int(label)
20
21def read_label_file(file):
22 f = open(file, "r")
23 filepaths = []
24 labels = []
25 for line in f:
26 filepath, label = line.split(",")
27 filepaths.append(filepath)
28 labels.append(encode_label(label))
29 return filepaths, labels
30
31# reading labels and file path
32train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file)
33test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file)
34
35# transform relative path into full path
36train_filepaths = [ dataset_path + fp for fp in train_filepaths]
37test_filepaths = [ dataset_path + fp for fp in test_filepaths]
38
39# for this example we will create or own test partition
40all_filepaths = train_filepaths + test_filepaths
41all_labels = train_labels + test_labels
42
43all_filepaths = all_filepaths[:20]
44all_labels = all_labels[:20]
45
46# convert string into tensors
47all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string)
48all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32)
49
50# create a partition vector
51partitions = [0] * len(all_filepaths)
52partitions[:test_set_size] = [1] * test_set_size
53random.shuffle(partitions)
54
55# partition our data into a test and train set according to our partition vector
56train_images, test_images = tf.dynamic_partition(all_images, partitions, 2)
57train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2)
58
59# create input queues
60train_input_queue = tf.train.slice_input_producer(
61 [train_images, train_labels],
62 shuffle=False)
63test_input_queue = tf.train.slice_input_producer(
64 [test_images, test_labels],
65 shuffle=False)
66
67# process path and string tensor into an image and a label
68file_content = tf.read_file(train_input_queue[0])
69train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
70train_label = train_input_queue[1]
71
72file_content = tf.read_file(test_input_queue[0])
73test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
74test_label = test_input_queue[1]
75
76# define tensor shape
77train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
78test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
79
80
81# collect batches of images before processing
82train_image_batch, train_label_batch = tf.train.batch(
83 [train_image, train_label],
84 batch_size=BATCH_SIZE
85 #,num_threads=1
86 )
87test_image_batch, test_label_batch = tf.train.batch(
88 [test_image, test_label],
89 batch_size=BATCH_SIZE
90 #,num_threads=1
91 )
92
93print "input pipeline ready"
94
95with tf.Session() as sess:
96
97 # initialize the variables
98 sess.run(tf.initialize_all_variables())
99
100 # initialize the queue threads to start to shovel data
101 coord = tf.train.Coordinator()
102 threads = tf.train.start_queue_runners(coord=coord)
103
104 print "from the train set:"
105 for i in range(20):
106 print sess.run(train_label_batch)
107
108 print "from the test set:"
109 for i in range(10):
110 print sess.run(test_label_batch)
111
112 # stop our queue threads and properly close the session
113 coord.request_stop()
114 coord.join(threads)
115 sess.close()