Source code for garage.tf.optimizers.penalty_lbfgs_optimizer

"""Penalized Limited-memory BFGS (L-BFGS) optimizer."""
from dowel import logger
import numpy as np
import scipy.optimize
import tensorflow as tf

from garage.tf.misc import tensor_utils
from garage.tf.optimizers.utils import LazyDict


[docs]class PenaltyLbfgsOptimizer: """Penalized Limited-memory BFGS (L-BFGS) optimizer. Performs constrained optimization via penalized L-BFGS. The penalty term is adaptively adjusted to make sure that the constraint is satisfied. Args: max_opt_itr (int): Maximum iteration for update. initial_penalty (float): Initial penalty. min_penalty (float): Minimum penalty allowed. Penalty will be clipped if lower than this value. max_penalty (float): Maximum penalty allowed. Penalty will be clipped if higher than this value. increase_penalty_factor (float): Factor to increase penalty in each penalty iteration. decrease_penalty_factor (float): Factor to decrease penalty in each penalty iteration. max_penalty_itr (int): Maximum penalty iterations to perform. adapt_penalty (bool): Whether the penalty is adaptive or not. If false, penalty will not change. """ def __init__(self, max_opt_itr=20, initial_penalty=1.0, min_penalty=1e-2, max_penalty=1e6, increase_penalty_factor=2, decrease_penalty_factor=0.5, max_penalty_itr=10, adapt_penalty=True): self._max_opt_itr = max_opt_itr self._penalty = initial_penalty self._initial_penalty = initial_penalty self._min_penalty = min_penalty self._max_penalty = max_penalty self._increase_penalty_factor = increase_penalty_factor self._decrease_penalty_factor = decrease_penalty_factor self._max_penalty_itr = max_penalty_itr self._adapt_penalty = adapt_penalty self._opt_fun = None self._target = None self._max_constraint_val = None self._constraint_name = None
[docs] def update_opt(self, loss, target, leq_constraint, inputs, constraint_name='constraint', name='PenaltyLbfgsOptimizer', **kwargs): """Construct operation graph for the optimizer. Args: loss (tf.Tensor): Loss objective to minimize. target (object): Target object to optimize. The object should implemenet `get_params()` and `get_param_values`. leq_constraint (tuple): It contains a tf.Tensor and a float value. The tf.Tensor represents the constraint term, and the float value is the constraint value. inputs (list[tf.Tensor]): List of input placeholders. constraint_name (str): Constraint name for logging. name (str): Name scope. kwargs (dict): Extra unused keyword arguments. Some optimizers have extra input, e.g. KL constraint. """ del kwargs params = target.get_params() with tf.name_scope(name): constraint_term, constraint_value = leq_constraint penalty_var = tf.compat.v1.placeholder(tf.float32, tuple(), name='penalty') penalized_loss = loss + penalty_var * constraint_term self._target = target self._max_constraint_val = constraint_value self._constraint_name = constraint_name def get_opt_output(): """Helper function to construct graph. Returns: list[tf.Tensor]: Penalized loss and gradient tensor. """ with tf.name_scope('get_opt_output'): grads = tf.gradients(penalized_loss, params) for idx, (grad, param) in enumerate(zip(grads, params)): if grad is None: grads[idx] = tf.zeros_like(param) flat_grad = tensor_utils.flatten_tensor_variables(grads) return [ tf.cast(penalized_loss, tf.float64), tf.cast(flat_grad, tf.float64), ] self._opt_fun = LazyDict( f_loss=lambda: tensor_utils.compile_function( inputs, loss, log_name='f_loss'), f_constraint=lambda: tensor_utils.compile_function( inputs, constraint_term, log_name='f_constraint'), f_penalized_loss=lambda: tensor_utils.compile_function( inputs=inputs + [penalty_var], outputs=[penalized_loss, loss, constraint_term], log_name='f_penalized_loss', ), f_opt=lambda: tensor_utils.compile_function( inputs=inputs + [penalty_var], outputs=get_opt_output(), ))
[docs] def loss(self, inputs): """The loss. Args: inputs (list[numpy.ndarray]): List of input values. Returns: float: Loss. Raises: Exception: If loss function is None, i.e. not defined. """ if self._opt_fun is None: raise Exception( 'Use update_opt() to setup the loss function first.') return self._opt_fun['f_loss'](*inputs)
[docs] def constraint_val(self, inputs): """The constraint value. Args: inputs (list[numpy.ndarray]): List of input values. Returns: float: Constraint value. Raises: Exception: If loss function is None, i.e. not defined. """ if self._opt_fun is None: raise Exception( 'Use update_opt() to setup the loss function first.') return self._opt_fun['f_constraint'](*inputs)
[docs] def optimize(self, inputs, name='optimize'): """Perform optimization. Args: inputs (list[numpy.ndarray]): List of input values. name (str): Name scope. Raises: Exception: If loss function is None, i.e. not defined. """ if self._opt_fun is None: raise Exception( 'Use update_opt() to setup the loss function first.') with tf.name_scope(name): inputs = tuple(inputs) try_penalty = np.clip(self._penalty, self._min_penalty, self._max_penalty) penalty_scale_factor = None f_opt = self._opt_fun['f_opt'] f_penalized_loss = self._opt_fun['f_penalized_loss'] def gen_f_opt(penalty): # noqa: D202 """Return a function that set parameters values. Args: penalty (float): Penalty. Returns: callable: Function that set parameters values. """ def f(flat_params): """Helper function to set parameters values. Args: flat_params (numpy.ndarray): Flatten parameter values. Returns: list[tf.Tensor]: Penalized loss and gradient tensor. """ self._target.set_param_values(flat_params) return f_opt(*(inputs + (penalty, ))) return f cur_params = self._target.get_param_values().astype('float64') opt_params = cur_params for penalty_itr in range(self._max_penalty_itr): logger.log('trying penalty=%.3f...' % try_penalty) itr_opt_params, _, _ = scipy.optimize.fmin_l_bfgs_b( func=gen_f_opt(try_penalty), x0=cur_params, maxiter=self._max_opt_itr) _, try_loss, try_constraint_val = f_penalized_loss(*( inputs + (try_penalty, ))) logger.log('penalty %f => loss %f, %s %f' % (try_penalty, try_loss, self._constraint_name, try_constraint_val)) # Either constraint satisfied, or we are at the last iteration # already and no alternative parameter satisfies the constraint if try_constraint_val < self._max_constraint_val or \ (penalty_itr == self._max_penalty_itr - 1 and opt_params is None): opt_params = itr_opt_params if not self._adapt_penalty: break # Decide scale factor on the first iteration, or if constraint # violation yields numerical error if (penalty_scale_factor is None or np.isnan(try_constraint_val)): # Increase penalty if constraint violated, or if constraint # term is NAN if (try_constraint_val > self._max_constraint_val or np.isnan(try_constraint_val)): penalty_scale_factor = self._increase_penalty_factor else: # Otherwise (i.e. constraint satisfied), shrink penalty penalty_scale_factor = self._decrease_penalty_factor opt_params = itr_opt_params else: if (penalty_scale_factor > 1 and try_constraint_val <= self._max_constraint_val): break if (penalty_scale_factor < 1 and try_constraint_val >= self._max_constraint_val): break try_penalty *= penalty_scale_factor try_penalty = np.clip(try_penalty, self._min_penalty, self._max_penalty) self._penalty = try_penalty self._target.set_param_values(opt_params)
def __getstate__(self): """Object.__getstate__. Returns: dict: The state to be pickled for the instance. """ new_dict = self.__dict__.copy() del new_dict['_opt_fun'] return new_dict