Source code for garage.tf.optimizers.penalty_lbfgs_optimizer

from dowel import logger
import numpy as np
import scipy.optimize
import tensorflow as tf

from garage.tf.misc import tensor_utils
from garage.tf.optimizers.utils import LazyDict


[docs]class PenaltyLbfgsOptimizer: """ Performs constrained optimization via penalized L-BFGS. The penalty term is adaptively adjusted to make sure that the constraint is satisfied. """ def __init__(self, max_opt_itr=20, initial_penalty=1.0, min_penalty=1e-2, max_penalty=1e6, increase_penalty_factor=2, decrease_penalty_factor=0.5, max_penalty_itr=10, adapt_penalty=True): self._max_opt_itr = max_opt_itr self._penalty = initial_penalty self._initial_penalty = initial_penalty self._min_penalty = min_penalty self._max_penalty = max_penalty self._increase_penalty_factor = increase_penalty_factor self._decrease_penalty_factor = decrease_penalty_factor self._max_penalty_itr = max_penalty_itr self._adapt_penalty = adapt_penalty self._opt_fun = None self._target = None self._max_constraint_val = None self._constraint_name = None
[docs] def update_opt(self, loss, target, leq_constraint, inputs, constraint_name='constraint', name=None, *args, **kwargs): """ :param loss: Symbolic expression for the loss function. :param target: A parameterized object to optimize over. It should implement methods of the :class:`garage.core.paramerized.Parameterized` class. :param leq_constraint: A constraint provided as a tuple (f, epsilon), of the form f(*inputs) <= epsilon. :param inputs: A list of symbolic variables as inputs :return: No return value. """ params = target.get_params(trainable=True) with tf.name_scope(name, 'PenaltyLbfgsOptimizer', [leq_constraint, loss, params]): constraint_term, constraint_value = leq_constraint penalty_var = tf.compat.v1.placeholder(tf.float32, tuple(), name='penalty') penalized_loss = loss + penalty_var * constraint_term self._target = target self._max_constraint_val = constraint_value self._constraint_name = constraint_name def get_opt_output(): with tf.name_scope('get_opt_output', values=[params, penalized_loss]): grads = tf.gradients(penalized_loss, params) for idx, (grad, param) in enumerate(zip(grads, params)): if grad is None: grads[idx] = tf.zeros_like(param) flat_grad = tensor_utils.flatten_tensor_variables(grads) return [ tf.cast(penalized_loss, tf.float64), tf.cast(flat_grad, tf.float64), ] self._opt_fun = LazyDict( f_loss=lambda: tensor_utils.compile_function( inputs, loss, log_name='f_loss'), f_constraint=lambda: tensor_utils.compile_function( inputs, constraint_term, log_name='f_constraint'), f_penalized_loss=lambda: tensor_utils.compile_function( inputs=inputs + [penalty_var], outputs=[penalized_loss, loss, constraint_term], log_name='f_penalized_loss', ), f_opt=lambda: tensor_utils.compile_function( inputs=inputs + [penalty_var], outputs=get_opt_output(), ))
[docs] def loss(self, inputs): if self._opt_fun is None: raise Exception( 'Use update_opt() to setup the loss function first.') return self._opt_fun['f_loss'](*inputs)
[docs] def constraint_val(self, inputs): if self._opt_fun is None: raise Exception( 'Use update_opt() to setup the loss function first.') return self._opt_fun['f_constraint'](*inputs)
[docs] def optimize(self, inputs, name=None): if self._opt_fun is None: raise Exception( 'Use update_opt() to setup the loss function first.') with tf.name_scope(name, 'optimize', values=[inputs]): inputs = tuple(inputs) try_penalty = np.clip(self._penalty, self._min_penalty, self._max_penalty) penalty_scale_factor = None f_opt = self._opt_fun['f_opt'] f_penalized_loss = self._opt_fun['f_penalized_loss'] def gen_f_opt(penalty): def f(flat_params): self._target.set_param_values(flat_params, trainable=True) return f_opt(*(inputs + (penalty, ))) return f cur_params = self._target.get_param_values( trainable=True).astype('float64') opt_params = cur_params for penalty_itr in range(self._max_penalty_itr): logger.log('trying penalty=%.3f...' % try_penalty) itr_opt_params, _, _ = scipy.optimize.fmin_l_bfgs_b( func=gen_f_opt(try_penalty), x0=cur_params, maxiter=self._max_opt_itr) _, try_loss, try_constraint_val = f_penalized_loss(*( inputs + (try_penalty, ))) logger.log('penalty %f => loss %f, %s %f' % (try_penalty, try_loss, self._constraint_name, try_constraint_val)) # Either constraint satisfied, or we are at the last iteration # already and no alternative parameter satisfies the constraint if try_constraint_val < self._max_constraint_val or \ (penalty_itr == self._max_penalty_itr - 1 and opt_params is None): opt_params = itr_opt_params if not self._adapt_penalty: break # Decide scale factor on the first iteration, or if constraint # violation yields numerical error if (penalty_scale_factor is None or np.isnan(try_constraint_val)): # Increase penalty if constraint violated, or if constraint # term is NAN if (try_constraint_val > self._max_constraint_val or np.isnan(try_constraint_val)): penalty_scale_factor = self._increase_penalty_factor else: # Otherwise (i.e. constraint satisfied), shrink penalty penalty_scale_factor = self._decrease_penalty_factor opt_params = itr_opt_params else: if (penalty_scale_factor > 1 and try_constraint_val <= self._max_constraint_val): break elif (penalty_scale_factor < 1 and try_constraint_val >= self._max_constraint_val): break try_penalty *= penalty_scale_factor try_penalty = np.clip(try_penalty, self._min_penalty, self._max_penalty) self._penalty = try_penalty self._target.set_param_values(opt_params, trainable=True)
def __getstate__(self): """Object.__getstate__.""" new_dict = self.__dict__.copy() del new_dict['_opt_fun'] return new_dict