Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
270 views
in Technique[技术] by (71.8m points)

python - What's going on in tf.train.shuffle_batch and `tf.train.batch?

I use Binary data to train a DNN.

But tf.train.shuffle_batch and tf.train.batchmake me confused.

This is my code and I will do some tests on it.

First Using_Queues_Lib.py:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf

NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 100
REAL32_BYTES=4


def read_dataset(filename_queue,data_length,label_length):
  class Record(object):
    pass
  result = Record()

  result_data  = data_length*REAL32_BYTES
  result_label = label_length*REAL32_BYTES
  record_bytes = result_data + result_label

  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

  record_bytes = tf.decode_raw(value, tf.float32)
  result.data  = tf.strided_slice(record_bytes, [0],[data_length])#record_bytes: tf.float list
  result.label = tf.strided_slice(record_bytes, [data_length],[data_length+label_length])
  return result


def _generate_data_and_label_batch(data, label, min_queue_examples,batch_size, shuffle):
  num_preprocess_threads = 16   #only speed code
  if shuffle:
    data_batch, label_batch = tf.train.shuffle_batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size,min_after_dequeue=min_queue_examples)
  else:
    data_batch, label_batch = tf.train.batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size)
  return data_batch, label_batch

def inputs(data_dir, batch_size,data_length,label_length):
  filenames = [os.path.join(data_dir, 'test_data_SE.dat')]
  for f in filenames:
    if not tf.gfile.Exists(f):
      raise ValueError('Failed to find file: ' + f)

  filename_queue = tf.train.string_input_producer(filenames)

  read_input = read_dataset(filename_queue,data_length,label_length)

  read_input.data.set_shape([data_length])   #important
  read_input.label.set_shape([label_length]) #important


  min_fraction_of_examples_in_queue = 0.4
  min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN *
                       min_fraction_of_examples_in_queue)
  print ('Filling queue with %d samples before starting to train. '
     'This will take a few minutes.' % min_queue_examples)

  return _generate_data_and_label_batch(read_input.data, read_input.label,
                                     min_queue_examples, batch_size,
                                     shuffle=True)

Second Using_Queues.py:

import Using_Queues_Lib
import tensorflow as tf
import numpy as np
import time


max_steps=10
batch_size=100
data_dir=r'.'
data_length=2
label_length=1

#-----------Save paras-----------
import struct
def WriteArrayFloat(file,data):
  fout=open(file,'wb')        
  fout.write(struct.pack('<'+str(data.flatten().size)+'f',
                                *data.flatten().tolist()))
  fout.close()
#-----------------------------

def add_layer(inputs, in_size, out_size, activation_function=None):
  Weights = tf.Variable(tf.truncated_normal([in_size, out_size]))
  biases = tf.Variable(tf.zeros([1, out_size]) + 0.1)
  Wx_plus_b = tf.matmul(inputs, Weights) + biases
  if activation_function is None:
    outputs = Wx_plus_b
  else:
    outputs = activation_function(Wx_plus_b)
  return outputs

data_train,labels_train=Using_Queues_Lib.inputs(data_dir=data_dir,
                      batch_size=batch_size,data_length=data_length,
                                          label_length=label_length)

xs=tf.placeholder(tf.float32,[None,data_length])
ys=tf.placeholder(tf.float32,[None,label_length])

l1 = add_layer(xs, data_length, 5, activation_function=tf.nn.sigmoid)
l2 = add_layer(l1, 5, 5, activation_function=tf.nn.sigmoid)
prediction = add_layer(l2, 5, label_length, activation_function=None)

loss = tf.reduce_mean(tf.square(ys - prediction))
train_step = tf.train.GradientDescentOptimizer(0.2).minimize(loss)

sess=tf.InteractiveSession()
tf.global_variables_initializer().run()

tf.train.start_queue_runners()

for i in range(max_steps):
  start_time=time.time()
  data_batch,label_batch=sess.run([data_train,labels_train])
  sess.run(train_step, feed_dict={xs: data_batch, ys: label_batch})
  duration=time.time()-start_time
  if i % 1 == 0:
    example_per_sec=batch_size/duration
    sec_pec_batch=float(duration)
    WriteArrayFloat(r'./data/'+str(i)+'.bin',
        np.concatenate((data_batch,label_batch),axis=1))
    format_str=('step %d,loss=%.8f(%.1f example/sec;%.3f sec/batch)')
    loss_value=sess.run(loss, feed_dict={xs: data_batch, ys: label_batch})
    print(format_str%(i,loss_value,example_per_sec,sec_pec_batch))

The data in here. And it generated by Mathematica.

data = Flatten@Table[{x, y, x*y}, {x, -1, 1, .05}, {y, -1, 1, .05}];
BinaryWrite[file, mydata, "Real32", ByteOrdering -> -1];
Close[file];

Length of data:1681

The data looks like this:

enter image description here

plot the data:The Red to Green color means the time when they occured in here

enter image description here

Run the Using_Queues.py,it will produce ten batch,and I draw each bach in this graph:(batch_size=100 and min_queue_examples=40) enter image description here

If batch_size=1024 and min_queue_examples=40: enter image description here

If batch_size=100 and min_queue_examples=4000: enter image description here

If batch_size=1024 and min_queue_examples=4000: enter image description here

And even If batch_size=1681 and min_queue_examples=4000: enter image description here

The region are not filled with points.

Why?

So why change the min_queue_examples make more random? How to determine the value min_queue_examples?

What's going on in tf.train.shuffle_batch?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The sampling function that tf.train.shuffle_batch() (and hence tf.RandomShuffleQueue) uses is a bit subtle. The implementation uses tf.RandomShuffleQueue.dequeue_many(batch_size), whose (simplified) implementation is as follows:

  • While the number of elements dequeued is less than batch_size:
    • Wait until the queue contains at least min_after_dequeue + 1 elements.
    • Select an element from the queue uniformly at random, remove it from the queue, and add it the output batch.

The other thing to note is how elements are added to the queue, which uses a background thread running tf.RandomShuffleQueue.enqueue() on the same queue:

  • Wait until the current size of the queue is less than its capacity.
  • Add the element to the queue.

As a result, the capacity and min_after_dequeue properties of the queue (plus the distribution of the input data being enqueued) determine the population from which tf.train.shuffle_batch() will sample. It appears that the data in your input files is ordered, so you are relying completely on the tf.train.shuffle_batch() function for randomness.

Taking your visualizations in turn:

  1. If capacity and min_after_dequeue are small relative to the dataset, the "shuffling" will select random elements from a small population resembling a "sliding window" across the dataset. With some small probability you will see old elements in the dequeued batch.

  2. If batch_size is large and min_after_dequeue is small relative to the dataset, the "shuffling" will again be selecting from a small "sliding window" across the dataset.

  3. If min_after_dequeue is large relative to batch_size and the size of the dataset, you will see (approximately) uniform samples from the data in each batch.

  4. If min_after_dequeue and batch_size are large relative to the size of the dataset, you will see (approximately) uniform samples from the data in each batch.

  5. In the case where min_after_dequeue is 4000, and batch_size is 1681, note that the expected number of copies of each element in the queue when it samples is 4000 / 1681 = 2.38, so it more is likely that some elements will be sampled more than once (and less likely that you will sample each unique element exactly once).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...