参考资料:
TENSORFLOW INPUT PIPELINE EXAMPLE
第二个参考资料是第一个的翻译版本,翻译的水平一般,建议看原文,不是很长。
下面是我挑了文章中重点的部分+自己的理解。
TL;DR;
一个适用于不是很大的数据集的pipline input 的例子。
Load Data in Tensorflow
input pipline 可以理解称一种load data的方式。 一般有两种方式load data,一种是比较传统的,使用feed 的方式。如果数据集比较大,这种方式就不适用了,因为这种方式需要将数据全部导入到memory中。因此tf提供了pipline input的读入数据的方式。
input pipline 会处理 csv file,解码文件格式,重构数据结构,打乱数据顺序,做数据扩充或者其他预处理,然后使用线程(threads)将数据导进batch.
Load the Label Data
确保使用正确的dataset,csv文件路径。
然后处理 得到train和test 的label
由于我们只是读数据而没有真的打算训练,所以没有使用one-hot的编码方式,而是直接将(本来也是由数字字符组成的)字符串,转化成int.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
def encode_label(label): return int(label) def read_label_file(file): f = open(file, "r") filepaths = [] labels = [] for line in f: filepath, label = line.split(",") filepaths.append(filepath) labels.append(encode_label(label)) return filepaths, labels # reading labels and file path train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file) test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file) |
Do Some Optional Processing on Our String Lists
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# transform relative path into full path train_filepaths = [ dataset_path + fp for fp in train_filepaths] test_filepaths = [ dataset_path + fp for fp in test_filepaths] # for this example we will create or own test partition all_filepaths = train_filepaths + test_filepaths all_labels = train_labels + test_labels # we limit the number of files to 20 to make the output more clear! all_filepaths = all_filepaths[:20] all_labels = all_labels[:20] |
Start Building the Pipeline
确保tensor的 dtype和list中的数据的type相匹配。
1 2 3 4 5 6 7 8 9 10 11 |
from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes # convert string into tensors all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string) all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32) |
Lets Partition the Data
这是一个可选的步骤。可能由于我们的数据集比较大,我们先把它分成train set 和 test set
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# create a partition vector partitions = [0] * len(all_filepaths) partitions[:test_set_size] = [1] * test_set_size random.shuffle(partitions) # partition our data into a test and train set according to our partition vector train_images, test_images = tf.dynamic_partition(all_images, partitions, 2) train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2) |
Build the Input Queues and Define How to Load Images
使用slice_input_producer 切分 tensor,得到一个个的实例(?),然后使用线程 queue them up.
shuffle表示是否打乱数据,此处我们不打乱。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# create input queues train_input_queue = tf.train.slice_input_producer( [train_images, train_labels], shuffle=False) test_input_queue = tf.train.slice_input_producer( [test_images, test_labels], shuffle=False) # process path and string tensor into an image and a label file_content = tf.read_file(train_input_queue[0]) train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS) train_label = train_input_queue[1] file_content = tf.read_file(test_input_queue[0]) test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS) test_label = test_input_queue[1] |
Group Samples into Batches
单个sample训练效率会很低,通常的做法是将若干个samples合成一个batch一起训练。每个batch中samples的个数就是所谓的batch_size
到目前为止我们只是描述了pipline大致的样子,但是tensorflow还不知道我们image数据的shape. 在使用tf.train_batch将samples合成若干个batch之前,我们需要首先定义image
tensor 的 shape
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# define tensor shape train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS]) test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS]) # collect batches of images before processing train_image_batch, train_label_batch = tf.train.batch( [train_image, train_label], batch_size=BATCH_SIZE #,num_threads=1 ) test_image_batch, test_label_batch = tf.train.batch( [test_image, test_label], batch_size=BATCH_SIZE #,num_threads=1 ) |
Run the Queue Runners and Start a Session
到目前为止我们已经构建好了input pipline.但是为了将pipline 启动,我们还需要使用线程,线程将加载queue,将数据导入tensorflow objects.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
with tf.Session() as sess: # initialize the variables sess.run(tf.initialize_all_variables()) # initialize the queue threads to start to shovel data coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) print "from the train set:" for i in range(20): print sess.run(train_label_batch) print "from the test set:" for i in range(10): print sess.run(test_label_batch) # stop our queue threads and properly close the session coord.request_stop() coord.join(threads) sess.close() |
如下面的输出所示,tensorflow 不会关心epoch(全部数据过了一遍叫一个epoch)的数值,所以需要你自己手动统计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
(tf-env)worker1:~$ python mnist_feed.py I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally input pipeline ready I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:900] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero I tensorflow/core/common_runtime/gpu/gpu_init.cc:102] Found device 0 with properties: name: GeForce GTX 960 major: 5 minor: 2 memoryClockRate (GHz) 1.253 pciBusID 0000:01:00.0 Total memory: 2.00GiB Free memory: 1.77GiB I tensorflow/core/common_runtime/gpu/gpu_init.cc:126] DMA: 0 I tensorflow/core/common_runtime/gpu/gpu_init.cc:136] 0: Y I tensorflow/core/common_runtime/gpu/gpu_device.cc:755] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 960, pci bus id: 0000:01:00.0) from the train set: [5 4 1 9 2] [1 3 1 3 6] [1 7 2 6 9] [5 4 1 9 2] [1 3 1 3 6] [1 7 2 6 9] [5 4 1 9 2] [1 3 1 3 6] [1 7 2 6 9] [5 4 1 9 2] [1 3 1 3 6] [1 7 2 6 9] [5 4 1 9 2] [1 3 1 3 6] [1 7 2 6 9] [5 4 1 9 2] [1 3 1 3 6] [1 7 2 6 9] [5 4 1 9 2] [1 3 1 3 6] from the test set: [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] [0 4 5 3 8] |
Complete Code for this example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# Example on how to use the tensorflow input pipelines. The explanation can be found here ischlag.github.io. import tensorflow as tf import random from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes dataset_path = "/path/to/your/dataset/mnist/" test_labels_file = "test-labels.csv" train_labels_file = "train-labels.csv" test_set_size = 5 IMAGE_HEIGHT = 28 IMAGE_WIDTH = 28 NUM_CHANNELS = 3 BATCH_SIZE = 5 def encode_label(label): return int(label) def read_label_file(file): f = open(file, "r") filepaths = [] labels = [] for line in f: filepath, label = line.split(",") filepaths.append(filepath) labels.append(encode_label(label)) return filepaths, labels # reading labels and file path train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file) test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file) # transform relative path into full path train_filepaths = [ dataset_path + fp for fp in train_filepaths] test_filepaths = [ dataset_path + fp for fp in test_filepaths] # for this example we will create or own test partition all_filepaths = train_filepaths + test_filepaths all_labels = train_labels + test_labels all_filepaths = all_filepaths[:20] all_labels = all_labels[:20] # convert string into tensors all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string) all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32) # create a partition vector partitions = [0] * len(all_filepaths) partitions[:test_set_size] = [1] * test_set_size random.shuffle(partitions) # partition our data into a test and train set according to our partition vector train_images, test_images = tf.dynamic_partition(all_images, partitions, 2) train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2) # create input queues train_input_queue = tf.train.slice_input_producer( [train_images, train_labels], shuffle=False) test_input_queue = tf.train.slice_input_producer( [test_images, test_labels], shuffle=False) # process path and string tensor into an image and a label file_content = tf.read_file(train_input_queue[0]) train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS) train_label = train_input_queue[1] file_content = tf.read_file(test_input_queue[0]) test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS) test_label = test_input_queue[1] # define tensor shape train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS]) test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS]) # collect batches of images before processing train_image_batch, train_label_batch = tf.train.batch( [train_image, train_label], batch_size=BATCH_SIZE #,num_threads=1 ) test_image_batch, test_label_batch = tf.train.batch( [test_image, test_label], batch_size=BATCH_SIZE #,num_threads=1 ) print "input pipeline ready" with tf.Session() as sess: # initialize the variables sess.run(tf.initialize_all_variables()) # initialize the queue threads to start to shovel data coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) print "from the train set:" for i in range(20): print sess.run(train_label_batch) print "from the test set:" for i in range(10): print sess.run(test_label_batch) # stop our queue threads and properly close the session coord.request_stop() coord.join(threads) sess.close() |
说点什么
您将是第一位评论人!