def _training_loss(
features, labels, logits, loss_fn, weight_column_name=None, head_name=None):
"""Returns training loss tensor.
Training loss is different from the loss reported on the tensorboard as we
should respect the example weights when computing the gradient.
L = sum_{i} w_{i} * l_{i} / B
where B is the number of examples in the batch, l_{i}, w_{i} are individual
losses, and example weight.
Args:
features: Features `dict`.
labels: Either a `Tensor` for labels or in multihead case, a `dict` of
string to `Tensor`.
logits: logits, a float `Tensor`. Shape is `(batch_size, logits_dimension)`.
loss_fn: Function taking `logits` and `labels`, and returning the raw
unweighted loss.
weight_column_name: Key for weights `Tensor` in `features`, if applicable.
head_name: Head name, used for summary.
Returns:
A loss `Output`.
"""
with ops.name_scope(
None, "training_loss",
tuple(six.itervalues(features)) + (labels, logits)) as name:
loss, weighted_average_loss = _loss(
loss_fn(logits, labels),
_weight_tensor(features, weight_column_name),
name=name)
summary.scalar(_head_prefixed(head_name, "loss"), weighted_average_loss)
return loss
def gradient_clipping(grads_and_vars):
"""Internal function for adaptive clipping."""
grads, variables = zip(*grads_and_vars)
norm = clip_ops.global_norm(grads)
max_norm, log_mean = _adaptive_max_norm(
norm, std_factor, decay, global_step, epsilon, name)
# reports the max gradient norm for debugging
if report_summary:
summary.scalar("global_norm/adaptive_max_gradient_norm", max_norm)
# factor will be 1. if norm is smaller than max_norm
factor = array_ops.where(norm < max_norm,
array_ops.ones_like(norm),
math_ops.exp(log_mean) / norm)
if static_max_norm is not None:
factor = math_ops.minimum(static_max_norm / norm, factor)
# apply factor
clipped_grads = []
for grad in grads:
if grad is None:
clipped_grads.append(None)
elif isinstance(grad, ops.IndexedSlices):
clipped_grads.append(ops.IndexedSlices(
grad.values * factor, grad.indices, grad.dense_shape))
else:
clipped_grads.append(grad * factor)
return list(zip(clipped_grads, variables))
def _centered_bias(num_label_columns):
centered_bias = variables.Variable(
array_ops.zeros([num_label_columns]),
collections=[_CENTERED_BIAS, ops.GraphKeys.GLOBAL_VARIABLES],
name=_CENTERED_BIAS_WEIGHT)
for i in range(num_label_columns):
summary.scalar("centered_bias %d" % i, centered_bias[i])
return centered_bias
def prefetch_queue(tensors,
capacity=8,
num_threads=1,
shared_name=None,
name=None):
"""Creates a queue to prefetech tensors from `tensors`.
A queue runner for enqueing tensors into the prefetch_queue is automatically
added to the TF QueueRunners collection.
Example:
This is for example useful to pre-assemble input batches read with
`tf.train.batch()` and enqueue the pre-assembled batches. Ops that dequeue
from the pre-assembled queue will not pay the cost of assembling the batch.
images, labels = tf.train.batch([image, label], batch_size=32, num_threads=4)
batch_queue = prefetch_queue([images, labels])
images, labels = batch_queue.dequeue()
logits = Net(images)
loss = Loss(logits, labels)
Args:
tensors: A list or dictionary of `Tensors` to enqueue in the buffer.
capacity: An integer. The maximum number of elements in the queue.
num_threads: An integer. Number of threads running the enqueue op.
shared_name: (optional). If set, this queue will be shared under the given
name across multiple sessions.
name: (Optional) A name for the operations.
Returns:
A queue from which you can dequeue tensors with the same type and shape
as `tensors`.
"""
if isinstance(tensors, dict):
# Need to wrap the keys and values in list() since Python3 returns views.
# We sort the keys so the order is consistent across runs.
names = list(sorted(tensors.keys()))
tensor_list = list([tensors[n] for n in names])
else:
names = None
tensor_list = tensors
with ops.name_scope(name, "prefetch_queue", tensor_list) as name:
dtypes = [t.dtype for t in tensor_list]
shapes = [t.get_shape() for t in tensor_list]
queue = data_flow_ops.FIFOQueue(capacity=capacity,
dtypes=dtypes,
shapes=shapes,
names=names,
shared_name=shared_name)
enqueue_op = queue.enqueue(tensors)
queue_runner.add_queue_runner(
queue_runner.QueueRunner(queue, [enqueue_op] * num_threads))
summary.scalar("fraction_of_%d_full" % capacity,
math_ops.to_float(queue.size()) * (1. / capacity))
return queue
def input_producer(input_tensor, element_shape=None, num_epochs=None,
shuffle=True, seed=None, capacity=32, shared_name=None,
summary_name=None, name=None):
"""Output the rows of `input_tensor` to a queue for an input pipeline.
Args:
input_tensor: A tensor with the rows to produce. Must be at least
one-dimensional. Must either have a fully-defined shape, or
`element_shape` must be defined.
element_shape: (Optional.) A `TensorShape` representing the shape of a
row of `input_tensor`, if it cannot be inferred.
num_epochs: (Optional.) An integer. If specified `input_producer` produces
each row of `input_tensor` `num_epochs` times before generating an
`OutOfRange` error. If not specified, `input_producer` can cycle through
the rows of `input_tensor` an unlimited number of times.
shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled
within each epoch.
seed: (Optional.) An integer. The seed to use if `shuffle` is true.
capacity: (Optional.) The capacity of the queue to be used for buffering
the input.
shared_name: (Optional.) If set, this queue will be shared under the given
name across multiple sessions.
summary_name: (Optional.) If set, a scalar summary for the current queue
size will be generated, using this name as part of the tag.
name: (Optional.) A name for queue.
Returns:
A queue with the output rows. A `QueueRunner` for the queue is
added to the current `QUEUE_RUNNER` collection of the current
graph.
Raises:
ValueError: If the shape of the input cannot be inferred from the arguments.
"""
with ops.name_scope(name, "input_producer", [input_tensor]):
input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor")
element_shape = input_tensor.get_shape()[1:].merge_with(element_shape)
if not element_shape.is_fully_defined():
raise ValueError("Either `input_tensor` must have a fully defined shape "
"or `element_shape` must be specified")
if shuffle:
input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
input_tensor = limit_epochs(input_tensor, num_epochs)
q = data_flow_ops.FIFOQueue(capacity=capacity,
dtypes=[input_tensor.dtype.base_dtype],
shapes=[element_shape],
shared_name=shared_name, name=name)
enq = q.enqueue_many([input_tensor])
queue_runner.add_queue_runner(queue_runner.QueueRunner(q, [enq]))
if summary_name is not None:
summary.scalar("queue/%s/%s" % (q.name, summary_name),
math_ops.cast(q.size(), dtypes.float32) * (1. / capacity))
return q
def _conditional_batch(tensors, keep_input, batch_size, num_threads=10):
"""Conditionally enqueue tensors based on accept_prob.
Specifically, enqueue the element if accept_prob > rand_unif([0, 1]).
Args:
tensors: List of tensors to enqueue.
keep_input: Bool. Whether to enqueue or not.
batch_size: Size of batch.
num_threads: Number of enqueueing threads.
Returns:
List of batched tensors.
Raises:
ValueError: `accept_prob` isn't 0D.
"""
keep_input.get_shape().assert_has_rank(0)
# Determine shapes and types of to-be-enqueued-tensors.
shapes_list = []
dtypes_list = []
for tensor in tensors:
cur_shape = tensor.get_shape()
cur_shape.assert_is_fully_defined()
shapes_list.append(cur_shape)
dtypes_list.append(tensor.dtype)
final_q = data_flow_ops.FIFOQueue(capacity=batch_size,
shapes=shapes_list,
dtypes=dtypes_list,
name='batched_queue')
summary.scalar('queue/%s/size' % final_q.name, final_q.size())
# Conditionally enqueue.
# Reshape enqueue op to match no_op's shape.
conditional_enqueue = control_flow_ops.cond(
keep_input,
lambda: final_q.enqueue(tensors),
control_flow_ops.no_op)
queue_runner.add_queue_runner(queue_runner.QueueRunner(
final_q, [conditional_enqueue] * num_threads))
out_tensor = final_q.dequeue_many(batch_size)
# Queues return a single tensor if the list of enqued tensors is one. Since we
# want the type to be the same in all cases, always return a list.
if isinstance(out_tensor, ops.Tensor):
out_tensor = [out_tensor]
return out_tensor
def __init__(self,
examples,
variables,
options):
"""Create a new sdca optimizer."""
if not examples or not variables or not options:
raise ValueError('examples, variables and options must all be specified.')
supported_losses = ('logistic_loss', 'squared_loss', 'hinge_loss',
'smooth_hinge_loss')
if options['loss_type'] not in supported_losses:
raise ValueError('Unsupported loss_type: ', options['loss_type'])
self._assertSpecified(['example_labels', 'example_weights', 'example_ids',
'sparse_features', 'dense_features'], examples)
self._assertList(['sparse_features', 'dense_features'], examples)
self._assertSpecified(['sparse_features_weights', 'dense_features_weights'],
variables)
self._assertList(['sparse_features_weights', 'dense_features_weights'],
variables)
self._assertSpecified(['loss_type', 'symmetric_l2_regularization',
'symmetric_l1_regularization'], options)
for name in ['symmetric_l1_regularization', 'symmetric_l2_regularization']:
value = options[name]
if value < 0.0:
raise ValueError('%s should be non-negative. Found (%f)' %
(name, value))
self._examples = examples
self._variables = variables
self._options = options
self._create_slots()
self._hashtable = ShardedMutableDenseHashTable(
key_dtype=dtypes.int64,
value_dtype=dtypes.float32,
num_shards=self._num_table_shards(),
default_value=[0.0, 0.0, 0.0, 0.0],
# SdcaFprint never returns 0 or 1 for the low64 bits, so this a safe
# empty_key (that will never collide with actual payloads).
empty_key=[0, 0])
summary.scalar('approximate_duality_gap', self.approximate_duality_gap())
summary.scalar('examples_seen', self._hashtable.size())
def _training_loss(self, features, labels, logits=None,
logits_input=None, name="training_loss"):
"""Returns training loss tensor for this head.
Training loss is different from the loss reported on the tensorboard as we
should respect the example weights when computing the gradient.
L = sum_{i} w_{i} * l_{i} / B
where B is the number of examples in the batch, l_{i}, w_{i} are individual
losses, and example weight.
Args:
features: features dict.
labels: either a tensor for labels or in multihead case, a dict of string
to labels tensor.
logits: logits, a float tensor.
logits_input: Output of last hidden layer.
name: Op name.
Returns:
A tuple of training Loss and additional_train_op (possibly None)
"""
labels = _check_labels(labels, self._label_name)
centered_bias_step = None
if self._enable_centered_bias:
logits = nn.bias_add(logits, _centered_bias(
self.logits_dimension,
self._centered_bias_weight_collection))
centered_bias_step = [_centered_bias_step(
self.logits_dimension,
self._centered_bias_weight_collection,
labels,
self._train_loss_fn)]
loss_unweighted = self._train_loss_fn(logits, labels)
loss, weighted_average_loss = _loss(
loss_unweighted,
_weight_tensor(features, self._weight_column_name),
name=name)
summary.scalar(
_head_prefixed(self._head_name, "loss"), weighted_average_loss)
return loss, centered_bias_step
def _centered_bias(logits_dimension):
"""Returns `logits`, optionally with centered bias applied.
Args:
logits_dimension: Last dimension of `logits`. Must be >= 1.
Returns:
Centered bias `Variable`.
Raises:
ValueError: if `logits_dimension` is invalid.
"""
if (logits_dimension is None) or (logits_dimension < 1):
raise ValueError("Invalid logits_dimension %s." % logits_dimension)
centered_bias = variable_scope.get_variable(
name="centered_bias_weight",
shape=(logits_dimension,),
initializer=init_ops.zeros_initializer,
trainable=True)
for dim in range(logits_dimension):
summary.scalar("centered_bias_%d" % dim, centered_bias[dim])
return centered_bias
def _add_scalar_summary(tensor, tag=None):
"""Add a scalar summary operation for the tensor.
Args:
tensor: The tensor to summarize.
tag: The tag to use, if None then use tensor's op's name.
Returns:
The created histogram summary.
Raises:
ValueError: If the tag is already in use or the rank is not 0.
"""
tensor.get_shape().assert_has_rank(0)
tag = tag or "%s_summary" % tensor.op.name
return summary.scalar(tag, tensor)
def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False,
shapes=None, dynamic_pad=False, allow_smaller_final_batch=False,
shared_name=None, name=None):
"""Runs a list of tensors to fill a queue to create batches of examples.
The `tensors_list` argument is a list of tuples of tensors, or a list of
dictionaries of tensors. Each element in the list is treated similarly
to the `tensors` argument of `tf.train.batch()`.
Enqueues a different list of tensors in different threads.
Implemented using a queue -- a `QueueRunner` for the queue
is added to the current `Graph`'s `QUEUE_RUNNER` collection.
`len(tensors_list)` threads will be started,
with thread `i` enqueuing the tensors from
`tensors_list[i]`. `tensors_list[i1][j]` must match
`tensors_list[i2][j]` in type and shape, except in the first
dimension if `enqueue_many` is true.
If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
to represent a single example. An input tensor `x` will be output as a
tensor with shape `[batch_size] + x.shape`.
If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
represent a batch of examples, where the first dimension is indexed
by example, and all members of `tensors_list[i]` should have the
same size in the first dimension. The slices of any input tensor
`x` are treated as examples, and the output tensors will have shape
`[batch_size] + x.shape[1:]`.
The `capacity` argument controls the how long the prefetching is allowed to
grow the queues.
The returned operation is a dequeue operation and will throw
`tf.errors.OutOfRangeError` if the input queue is exhausted. If this
operation is feeding another input queue, its queue runner will catch
this exception, however, if this operation is used in your main thread
you are responsible for catching this yourself.
*N.B.:* If `dynamic_pad` is `False`, you must ensure that either
(i) the `shapes` argument is passed, or (ii) all of the tensors in
`tensors_list` must have fully-defined shapes. `ValueError` will be
raised if neither of these conditions holds.
If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
tensors is known, but individual dimensions may have value `None`.
In this case, for each enqueue the dimensions with value `None`
may have a variable length; upon dequeue, the output tensors will be padded
on the right to the maximum shape of the tensors in the current minibatch.
For numbers, this padding takes value 0. For strings, this padding is
the empty string. See `PaddingFIFOQueue` for more info.
If `allow_smaller_final_batch` is `True`, a smaller batch value than
`batch_size` is returned when the queue is closed and there are not enough
elements to fill the batch, otherwise the pending elements are discarded.
In addition, all output tensors' static shapes, as accessed via the
`get_shape` method will have a first `Dimension` value of `None`, and
operations that depend on fixed batch_size would fail.
Args:
tensors_list: A list of tuples or dictionaries of tensors to enqueue.
batch_size: An integer. The new batch size pulled from the queue.
capacity: An integer. The maximum number of elements in the queue.
enqueue_many: Whether each tensor in `tensor_list_list` is a single
example.
shapes: (Optional) The shapes for each example. Defaults to the
inferred shapes for `tensor_list_list[i]`.
dynamic_pad: Boolean. Allow variable dimensions in input shapes.
The given dimensions are padded upon dequeue so that tensors within a
batch have the same shapes.
allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
batch to be smaller if there are insufficient items left in the queue.
shared_name: (Optional) If set, this queue will be shared under the given
name across multiple sessions.
name: (Optional) A name for the operations.
Returns:
A list or dictionary of tensors with the same number and types as
`tensors_list[i]`.
Raises:
ValueError: If the `shapes` are not specified, and cannot be
inferred from the elements of `tensor_list_list`.
"""
tensor_list_list = _as_tensor_list_list(tensors_list)
with ops.name_scope(name, "batch_join", _flatten(tensor_list_list)) as name:
tensor_list_list = _validate_join(tensor_list_list)
tensor_list_list, sparse_info = _store_sparse_tensors_join(
tensor_list_list, enqueue_many)
types = _dtypes(tensor_list_list)
shapes = _shapes(tensor_list_list, shapes, enqueue_many)
# TODO(josh11b,mrry): Switch to BatchQueue once it is written.
queue = _which_queue(dynamic_pad)(
capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
_enqueue_join(queue, tensor_list_list, enqueue_many)
summary.scalar("queue/%s/fraction_of_%d_full" % (queue.name, capacity),
math_ops.cast(queue.size(), dtypes.float32) *
(1. / capacity))
if allow_smaller_final_batch:
#.........这里部分代码省略.........
def queue_parsed_features(parsed_features,
keys=None,
feature_queue_capacity=100,
num_queue_runners=None,
num_enqueue_threads=None,
name=None):
"""Speeds up parsing by using queues to do it asynchronously.
This function adds the tensors in `parsed_features` to a queue, which allows
the parsing (or any other expensive op before this) to be asynchronous wrt the
rest of the training graph. This greatly improves read latency and speeds up
training since the data will already be parsed and ready when each step of
training needs it.
All queue runners are added to the queue runners collection, and may be
started via `start_queue_runners`.
All ops are added to the default graph.
Args:
parsed_features: A dict of string key to `Tensor` or `SparseTensor` objects.
keys: `Tensor` of string keys.
feature_queue_capacity: Capacity of the parsed features queue.
num_queue_runners: Deprecated. Defaults to 2 if this and
`num_enqueue_threads` are both `None`. This is the number of queue
runners to start for the feature queue. Adding multiple queue runners for
the parsed example queue helps maintain a full queue when the subsequent
computations overall are cheaper than parsing. This argument will be
deprecated and replaced with `num_enqueue_threads`.
num_enqueue_threads: Number of threads to enqueue the parsed example queue.
Using multiple threads to enqueue the parsed example queue helps maintain
a full queue when the subsequent computations overall are cheaper than
parsing. This argument will replace `num_queue_runners`. This and
`num_queue_runners` can not both be set.
name: Name of resulting op.
Returns:
Returns tuple of:
- `Tensor` corresponding to `keys` if provided, otherwise `None`.
- A dict of string key to `Tensor` or `SparseTensor` objects corresponding
to `parsed_features`.
Raises:
ValueError: for invalid inputs.
"""
num_queue_runners, num_enqueue_threads = _check_enqueue_params(
num_queue_runners, num_enqueue_threads)
args = list(parsed_features.values())
if keys is not None:
args += [keys]
with ops.name_scope(name, 'queue_parsed_features', args):
# Lets also add preprocessed tensors into the queue types for each item of
# the queue.
tensors_to_enqueue = []
# Each entry contains the key, and a boolean which indicates whether the
# tensor was a sparse tensor.
tensors_mapping = []
# TODO(sibyl-Aix6ihai): Most of the functionality here is about pushing sparse
# tensors into a queue. This could be taken care in somewhere else so others
# can reuse it. Also, QueueBase maybe extended to handle sparse tensors
# directly.
for key in sorted(parsed_features.keys()):
tensor = parsed_features[key]
if isinstance(tensor, sparse_tensor.SparseTensor):
tensors_mapping.append((key, True))
tensors_to_enqueue.extend([tensor.indices, tensor.values, tensor.shape])
else:
tensors_mapping.append((key, False))
tensors_to_enqueue.append(tensor)
if keys is not None:
tensors_to_enqueue.append(keys)
queue_dtypes = [x.dtype for x in tensors_to_enqueue]
input_queue = data_flow_ops.FIFOQueue(feature_queue_capacity, queue_dtypes)
# Add a summary op to debug if our feature queue is full or not.
summary.scalar('queue/parsed_features/%s/fraction_of_%d_full' %
(input_queue.name, feature_queue_capacity),
math_ops.cast(input_queue.size(), dtypes.float32) *
(1. / feature_queue_capacity))
# Add multiple queue runners so that the queue is always full. Adding more
# than two queue-runners may hog the cpu on the worker to fill up the queue.
#
# Note: this can result in large last batch being lost as the multiple queue
# runner threads do not coordinate with each other. Please use
# `num_enqueue_threads` instead.
if num_queue_runners is not None:
for _ in range(num_queue_runners):
queue_runner.add_queue_runner(
queue_runner.QueueRunner(
input_queue, [input_queue.enqueue(tensors_to_enqueue)],
queue_closed_exception_types=(errors.OutOfRangeError,
errors.CancelledError)))
# Use a single QueueRunner with multiple threads to enqueue so the queue is
# always full. The threads are coordinated so the last batch will not be
# lost.
elif num_enqueue_threads is not None:
#.........这里部分代码省略.........
#.........这里部分代码省略.........
seed: used to seed shuffling and reader starting points.
name: a scope name identifying the data.
enqueue_size: the number of rows to enqueue per step.
num_epochs: limit enqueuing to a specified number of epochs, if provided.
Returns:
A queue filled with the rows of the given array or `DataFrame`.
Raises:
TypeError: `data` is not a Pandas `DataFrame` or a numpy `ndarray`.
"""
with ops.name_scope(name):
if isinstance(data, np.ndarray):
types = [dtypes.int64, dtypes.as_dtype(data.dtype)]
queue_shapes = [(), data.shape[1:]]
get_feed_fn = _ArrayFeedFn
elif isinstance(data, collections.OrderedDict):
types = [dtypes.int64] + [dtypes.as_dtype(col.dtype)
for col in data.values()]
queue_shapes = [()] + [col.shape[1:] for col in data.values()]
get_feed_fn = _OrderedDictNumpyFeedFn
elif HAS_PANDAS and isinstance(data, pd.DataFrame):
types = [dtypes.as_dtype(dt)
for dt in [data.index.dtype] + list(data.dtypes)]
queue_shapes = [() for _ in types]
get_feed_fn = _PandasFeedFn
else:
raise TypeError(
"data must be either a numpy array or pandas DataFrame if pandas is "
"installed; got {}".format(type(data).__name__))
# TODO(jamieas): TensorBoard warnings for all warnings below once available.
if num_threads > 1 and num_epochs is not None:
logging.warning(
"enqueue_data was called with num_epochs and num_threads > 1. "
"num_epochs is applied per thread, so this will produce more "
"epochs than you probably intend. "
"If you want to limit epochs, use one thread.")
if shuffle and num_threads > 1 and num_epochs is not None:
logging.warning(
"enqueue_data was called with shuffle=True, num_threads > 1, and "
"num_epochs. This will create multiple threads, all reading the "
"array/dataframe in order adding to the same shuffling queue; the "
"results will likely not be sufficiently shuffled.")
if not shuffle and num_threads > 1:
logging.warning(
"enqueue_data was called with shuffle=False and num_threads > 1. "
"This will create multiple threads, all reading the "
"array/dataframe in order. If you want examples read in order, use"
" one thread; if you want multiple threads, enable shuffling.")
if shuffle:
min_after_dequeue = int(capacity / 4 if min_after_dequeue is None else
min_after_dequeue)
queue = data_flow_ops.RandomShuffleQueue(capacity,
min_after_dequeue,
dtypes=types,
shapes=queue_shapes,
seed=seed)
else:
min_after_dequeue = 0 # just for the summary text
queue = data_flow_ops.FIFOQueue(capacity,
dtypes=types,
shapes=queue_shapes)
enqueue_ops = []
feed_fns = []
for i in range(num_threads):
# Note the placeholders have no shapes, so they will accept any
# enqueue_size. enqueue_many below will break them up.
placeholders = [array_ops.placeholder(t) for t in types]
enqueue_ops.append(queue.enqueue_many(placeholders))
seed_i = None if seed is None else (i + 1) * seed
feed_fns.append(get_feed_fn(placeholders,
data,
enqueue_size,
random_start=shuffle,
seed=seed_i,
num_epochs=num_epochs))
runner = fqr.FeedingQueueRunner(queue=queue,
enqueue_ops=enqueue_ops,
feed_fns=feed_fns)
queue_runner.add_queue_runner(runner)
full = (math_ops.cast(
math_ops.maximum(0, queue.size() - min_after_dequeue),
dtypes.float32) * (1. / (capacity - min_after_dequeue)))
# Note that name contains a '/' at the end so we intentionally do not place
# a '/' after %s below.
summary_name = ("queue/%sfraction_over_%d_of_%d_full" %
(queue.name, min_after_dequeue,
capacity - min_after_dequeue))
summary.scalar(summary_name, full)
return queue
def bucket(tensors,
which_bucket,
batch_size,
num_buckets,
num_threads=1,
capacity=32,
shapes=None,
dynamic_pad=False,
allow_smaller_final_batch=False,
keep_input=None,
shared_name=None,
name=None):
"""Lazy bucketing of input tensors according to `which_bucket`.
The argument `tensors` can be a list or a dictionary of tensors.
The value returned by the function will be of the same type
as `tensors`.
The tensors entering this function are put into the bucket given by
`which_bucket`. Each bucket has its own queue. When a bucket contains
`batch_size` elements, this minibatch is pushed onto a top queue. The
tensors returned from this function are a the result of dequeueing the
next minibatch from this top queue.
This function is implemented using several queues. A `QueueRunner` for the
queues is added to the current `Graph`'s `QUEUE_RUNNER` collection.
As the returned tensors are the result of of a dequeue operation, evaluating
them will throw a `tf.errors.OutOfRangeError` when the input queue is
exhausted. If these tensors are feeding another input queue, its queue runner
will catch this exception, however, if they are used in your main thread
you are responsible for catching this yourself.
*N.B.:* If `dynamic_pad` is `False`, you must ensure that either
(i) the `shapes` argument is passed, or (ii) all of the tensors in
`tensors` must have fully-defined shapes. `ValueError` will be
raised if neither of these conditions holds.
If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
tensors is known, but individual dimensions may have shape `None`.
In this case, for each enqueue the dimensions with value `None`
may have a variable length; upon dequeue, the output tensors will be padded
on the right to the maximum shape of the tensors in the current minibatch.
For numbers, this padding takes value 0. For strings, this padding is
the empty string. See `PaddingFIFOQueue` for more info.
If `allow_smaller_final_batch` is `True`, a smaller batch value than
`batch_size` is returned when the queues are closed and there are not enough
elements to fill the batch, otherwise the pending elements are discarded.
In addition, all output tensors' static shapes, as accessed via the
`get_shape()` method will have a 0th `Dimension` value of `None`, and
operations that depend on fixed batch_size would fail.
Args:
tensors: The list or dictionary of tensors, representing a single element,
to bucket. Nested lists are not supported.
which_bucket: An `int32` scalar Tensor taking a value in `[0, num_buckets)`.
batch_size: The new batch size pulled from the queue
(python int or int32 scalar).
num_buckets: A python integer, the number of buckets.
num_threads: An integer. The number of threads enqueuing `tensors`.
capacity: An integer. The maximum number of minibatches in the top queue,
and also the maximum number of elements within each bucket.
shapes: (Optional) The shapes for each example. Defaults to the
inferred shapes for `tensors`.
dynamic_pad: Boolean. Allow variable dimensions in input shapes.
The given dimensions are padded upon dequeue so that tensors within a
batch have the same shapes.
allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
batches to be smaller if there are insufficient items left in the queues.
keep_input: (Optional). A `bool` scalar Tensor. If provided, this tensor
controls whether the input is added to the queue or not. If it evaluates
`True`, then `tensors` are added to the bucket; otherwise they are
dropped. This tensor essentially acts as a filtering mechanism.
The default behavior is to assume `keep_input=True`.
shared_name: (Optional). If set, the queues will be shared under the given
name across multiple sessions.
name: (Optional) A name for the operations.
Returns:
A tuple `(bucket, outputs)` where `bucket` is
a `int32` scalar tensor and `outputs` is a list or
dictionary of batched outputs corresponding to elements of `tensors`.
Every step will receive a new bucket of outputs.
Raises:
ValueError: If the `shapes` are not specified, and cannot be
inferred from the elements of `tensors`.
"""
tensor_list = _as_tensor_list(tensors)
with ops.name_scope(name, "bucket", tensor_list) as name:
tensor_list = _validate_bucket(tensor_list)
(tensor_list, sparse_info) = _store_sparse_tensors(
tensor_list, enqueue_many=False)
# Round-trip batch_size to a tensor, and possibly back
batch_size = ops.convert_to_tensor(
batch_size, dtype=dtypes.int32, name="batch_size")
static_batch_size = tensor_util.constant_value(batch_size)
batch_size = (
#.........这里部分代码省略.........
请发表评论