Decouple it by creating a variable stores dequeued value, and then depend on this variable instead of dequeue op. Advancing the queue happens during assign
Solution #1: fixed size data, use Variables
(image_batch_live,) = tf.train.batch([image],batch_size=5,num_threads=1,capacity=614)
image_batch = tf.Variable(
tf.zeros((batch_size, image_size, image_size, color_channels)),
trainable=False,
name="input_values_cached")
advance_batch = tf.assign(image_batch, image_batch_live)
Now image_batch
gives latest value of queue without advancing it and advance_batch
advances the queue.
Solution #2: variable size data, use persistent Tensors
Here we decouple the workflow by introducing dequeue_op
and dequeue_op2
. All computation depends on dequeue_op2
which is fed the saved value of dequeue_op
. Using get_session_tensor/get_session_handle
ensures that actual data remains in TensorFlow runtime and the value that's passed through feed_dict
is a short string identifier. The API is a little awkward because of dummy_handle
, I've brought up this issue here
import tensorflow as tf
def create_session():
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=3000))
return sess
tf.reset_default_graph()
sess = create_session()
dt = tf.int32
dummy_handle = sess.run(tf.get_session_handle(tf.constant(1)))
q = tf.FIFOQueue(capacity=20, dtypes=[dt])
enqueue_placeholder = tf.placeholder(dt, shape=[None])
enqueue_op = q.enqueue(enqueue_placeholder)
dequeue_op = q.dequeue()
size_op = q.size()
dequeue_handle_op = tf.get_session_handle(dequeue_op)
dequeue_placeholder, dequeue_op2 = tf.get_session_tensor(dummy_handle, dt)
compute_op1 = tf.reduce_sum(dequeue_op2)
compute_op2 = tf.reduce_sum(dequeue_op2)+1
# fill queue with variable size data
for i in range(10):
sess.run(enqueue_op, feed_dict={enqueue_placeholder:[1]*(i+1)})
sess.run(q.close())
try:
while(True):
dequeue_handle = sess.run(dequeue_handle_op) # advance the queue
val1 = sess.run(compute_op1, feed_dict={dequeue_placeholder: dequeue_handle.handle})
val2 = sess.run(compute_op2, feed_dict={dequeue_placeholder: dequeue_handle.handle})
size = sess.run(size_op)
print("val1 %d, val2 %d, queue size %d" % (val1, val2, size))
except tf.errors.OutOfRangeError:
print("Done")
You should see something like below when you run it
val1 1, val2 2, queue size 9
val1 2, val2 3, queue size 8
val1 3, val2 4, queue size 7
val1 4, val2 5, queue size 6
val1 5, val2 6, queue size 5
val1 6, val2 7, queue size 4
val1 7, val2 8, queue size 3
val1 8, val2 9, queue size 2
val1 9, val2 10, queue size 1
val1 10, val2 11, queue size 0
Done