Source code for garage.tf.algos.reps

"""Relative Entropy Policy Search implementation in Tensorflow."""
import collections

from dowel import logger, tabular
import numpy as np
import scipy.optimize
import tensorflow as tf

from garage import _Default, make_optimizer
from garage import log_performance, TrajectoryBatch
from garage.np.algos import RLAlgorithm
from garage.sampler import OnPolicyVectorizedSampler
from garage.tf import paths_to_tensors
from garage.tf.misc import tensor_utils
from garage.tf.misc.tensor_utils import flatten_inputs
from garage.tf.misc.tensor_utils import graph_inputs
from garage.tf.optimizers import LbfgsOptimizer
from garage.tf.samplers import BatchSampler


# pylint: disable=differing-param-doc, differing-type-doc
[docs]class REPS(RLAlgorithm): # noqa: D416 """Relative Entropy Policy Search. References ---------- [1] J. Peters, K. Mulling, and Y. Altun, "Relative Entropy Policy Search," Artif. Intell., pp. 1607-1612, 2008. Example: $ python garage/examples/tf/reps_gym_cartpole.py Args: env_spec (garage.envs.EnvSpec): Environment specification. policy (garage.tf.policies.StochasticPolicy): Policy. baseline (garage.tf.baselines.Baseline): The baseline. scope (str): Scope for identifying the algorithm. Must be specified if running multiple algorithms simultaneously, each using different environments and policies. max_path_length (int): Maximum length of a single rollout. discount (float): Discount. gae_lambda (float): Lambda used for generalized advantage estimation. center_adv (bool): Whether to rescale the advantages so that they have mean 0 and standard deviation 1. positive_adv (bool): Whether to shift the advantages so that they are always positive. When used in conjunction with center_adv the advantages will be standardized before shifting. fixed_horizon (bool): Whether to fix horizon. epsilon (float): Dual func parameter. l2_reg_dual (float): Coefficient for dual func l2 regularization. l2_reg_loss (float): Coefficient for policy loss l2 regularization. optimizer (object): The optimizer of the algorithm. Should be the optimizers in garage.tf.optimizers. optimizer_args (dict): Arguments of the optimizer. dual_optimizer (object): Dual func optimizer. dual_optimizer_args (dict): Arguments of the dual optimizer. name (str): Name of the algorithm. """ def __init__(self, env_spec, policy, baseline, max_path_length=500, discount=0.99, gae_lambda=1, center_adv=True, positive_adv=False, fixed_horizon=False, epsilon=0.5, l2_reg_dual=0., l2_reg_loss=0., optimizer=LbfgsOptimizer, optimizer_args=None, dual_optimizer=scipy.optimize.fmin_l_bfgs_b, dual_optimizer_args=None, name='REPS'): optimizer_args = optimizer_args or dict(max_opt_itr=_Default(50)) dual_optimizer_args = dual_optimizer_args or dict(maxiter=50) self.policy = policy self.max_path_length = max_path_length self._env_spec = env_spec self._baseline = baseline self._discount = discount self._gae_lambda = gae_lambda self._center_adv = center_adv self._positive_adv = positive_adv self._fixed_horizon = fixed_horizon self._flatten_input = True self._name = name self._name_scope = tf.name_scope(self._name) self._old_policy = policy.clone('old_policy') self._feat_diff = None self._param_eta = None self._param_v = None self._f_dual = None self._f_dual_grad = None self._f_policy_kl = None self._policy_network = None self._old_policy_network = None self._optimizer = make_optimizer(optimizer, **optimizer_args) self._dual_optimizer = dual_optimizer self._dual_optimizer_args = dual_optimizer_args self._epsilon = float(epsilon) self._l2_reg_dual = float(l2_reg_dual) self._l2_reg_loss = float(l2_reg_loss) self._episode_reward_mean = collections.deque(maxlen=100) if policy.vectorized: self.sampler_cls = OnPolicyVectorizedSampler else: self.sampler_cls = BatchSampler self.init_opt()
[docs] def init_opt(self): """Initialize the optimization procedure.""" pol_loss_inputs, pol_opt_inputs, dual_opt_inputs = self._build_inputs() self._policy_opt_inputs = pol_opt_inputs self._dual_opt_inputs = dual_opt_inputs pol_loss = self._build_policy_loss(pol_loss_inputs) self._optimizer.update_opt(loss=pol_loss, target=self.policy, inputs=flatten_inputs( self._policy_opt_inputs))
[docs] def train(self, runner): """Obtain samplers and start actual training for each epoch. Args: runner (LocalRunner): LocalRunner is passed to give algorithm the access to runner.step_epochs(), which provides services such as snapshotting and sampler control. Returns: float: The average return in last epoch cycle. """ last_return = None for _ in runner.step_epochs(): runner.step_path = runner.obtain_samples(runner.step_itr) last_return = self.train_once(runner.step_itr, runner.step_path) runner.step_itr += 1 return last_return
[docs] def train_once(self, itr, paths): """Perform one step of policy optimization given one batch of samples. Args: itr (int): Iteration number. paths (list[dict]): A list of collected paths. Returns: numpy.float64: Average return. """ # -- Stage: Calculate baseline paths = [ dict( observations=self._env_spec.observation_space.flatten_n( path['observations']) if self._flatten_input else path['observations'], actions=( self._env_spec.action_space.flatten_n( # noqa: E126 path['actions'])), rewards=path['rewards'], env_infos=path['env_infos'], agent_infos=path['agent_infos'], dones=path['dones']) for path in paths ] if hasattr(self._baseline, 'predict_n'): baseline_predictions = self._baseline.predict_n(paths) else: baseline_predictions = [ self._baseline.predict(path) for path in paths ] # -- Stage: Pre-process samples based on collected paths samples_data = paths_to_tensors(paths, self.max_path_length, baseline_predictions, self._discount, self._gae_lambda) # -- Stage: Run and calculate performance of the algorithm undiscounted_returns = log_performance( itr, TrajectoryBatch.from_trajectory_list(self._env_spec, paths), discount=self._discount) self._episode_reward_mean.extend(undiscounted_returns) tabular.record('Extras/EpisodeRewardMean', np.mean(self._episode_reward_mean)) samples_data['average_return'] = np.mean(undiscounted_returns) self.log_diagnostics(samples_data) logger.log('Optimizing policy...') self.optimize_policy(samples_data) return samples_data['average_return']
[docs] def log_diagnostics(self, paths): """Log diagnostic information. Args: paths (list[dict]): A list of collected paths. """ logger.log('Logging diagnostics...') self.policy.log_diagnostics(paths) self._baseline.log_diagnostics(paths)
def __getstate__(self): """Parameters to save in snapshot. Returns: dict: Parameters to save. """ data = self.__dict__.copy() del data['_name_scope'] del data['_policy_opt_inputs'] del data['_dual_opt_inputs'] del data['_f_dual'] del data['_f_dual_grad'] del data['_f_policy_kl'] del data['_policy_network'] del data['_old_policy_network'] return data def __setstate__(self, state): """Parameters to restore from snapshot. Args: state (dict): Parameters to restore from. """ self.__dict__ = state self._name_scope = tf.name_scope(self._name) self.init_opt()
[docs] def optimize_policy(self, samples_data): """Optimize the policy using the samples. Args: samples_data (dict): Processed sample data. See garage.tf.paths_to_tensors() for details. """ # Initial BFGS parameter values. x0 = np.hstack([self._param_eta, self._param_v]) # Set parameter boundaries: \eta>=1e-12, v unrestricted. bounds = [(-np.inf, np.inf) for _ in x0] bounds[0] = (1e-12, np.inf) # Optimize dual eta_before = self._param_eta logger.log('Computing dual before') self._feat_diff = self._features(samples_data) dual_opt_input_values = self._dual_opt_input_values(samples_data) dual_before = self._f_dual(*dual_opt_input_values) logger.log('Optimizing dual') def eval_dual(x): """Evaluate dual function loss. Args: x (numpy.ndarray): Input to dual function. Returns: numpy.float64: Dual function loss. """ self._param_eta = x[0] self._param_v = x[1:] dual_opt_input_values = self._dual_opt_input_values(samples_data) return self._f_dual(*dual_opt_input_values) def eval_dual_grad(x): """Evaluate gradient of dual function loss. Args: x (numpy.ndarray): Input to dual function. Returns: numpy.ndarray: Gradient of dual function loss. """ self._param_eta = x[0] self._param_v = x[1:] dual_opt_input_values = self._dual_opt_input_values(samples_data) grad = self._f_dual_grad(*dual_opt_input_values) eta_grad = np.float(grad[0]) v_grad = grad[1] return np.hstack([eta_grad, v_grad]) params_ast, _, _ = self._dual_optimizer(func=eval_dual, x0=x0, fprime=eval_dual_grad, bounds=bounds, **self._dual_optimizer_args) logger.log('Computing dual after') self._param_eta, self._param_v = params_ast[0], params_ast[1:] dual_opt_input_values = self._dual_opt_input_values(samples_data) dual_after = self._f_dual(*dual_opt_input_values) # Optimize policy policy_opt_input_values = self._policy_opt_input_values(samples_data) logger.log('Computing policy loss before') loss_before = self._optimizer.loss(policy_opt_input_values) logger.log('Computing policy KL before') policy_kl_before = self._f_policy_kl(*policy_opt_input_values) logger.log('Optimizing policy') self._optimizer.optimize(policy_opt_input_values) logger.log('Computing policy KL') policy_kl = self._f_policy_kl(*policy_opt_input_values) logger.log('Computing policy loss after') loss_after = self._optimizer.loss(policy_opt_input_values) tabular.record('EtaBefore', eta_before) tabular.record('EtaAfter', self._param_eta) tabular.record('DualBefore', dual_before) tabular.record('DualAfter', dual_after) tabular.record('{}/LossBefore'.format(self.policy.name), loss_before) tabular.record('{}/LossAfter'.format(self.policy.name), loss_after) tabular.record('{}/dLoss'.format(self.policy.name), loss_before - loss_after) tabular.record('{}/KLBefore'.format(self.policy.name), policy_kl_before) tabular.record('{}/KL'.format(self.policy.name), policy_kl) self._old_policy.model.parameters = self.policy.model.parameters
def _build_inputs(self): """Build input variables. Returns: namedtuple: Collection of variables to compute policy loss. namedtuple: Collection of variables to do policy optimization. """ observation_space = self.policy.observation_space action_space = self.policy.action_space with tf.name_scope('inputs'): obs_var = observation_space.to_tf_placeholder( name='obs', batch_dims=2) # yapf: disable action_var = action_space.to_tf_placeholder( name='action', batch_dims=2) # yapf: disable reward_var = tensor_utils.new_tensor( name='reward', ndim=2, dtype=tf.float32) # yapf: disable valid_var = tensor_utils.new_tensor( name='valid', ndim=2, dtype=tf.float32) # yapf: disable feat_diff = tensor_utils.new_tensor( name='feat_diff', ndim=2, dtype=tf.float32) # yapf: disable param_v = tensor_utils.new_tensor( name='param_v', ndim=1, dtype=tf.float32) # yapf: disable param_eta = tensor_utils.new_tensor( name='param_eta', ndim=0, dtype=tf.float32) # yapf: disable policy_state_info_vars = { k: tf.compat.v1.placeholder( tf.float32, shape=[None] * 2 + list(shape), name=k) for k, shape in self.policy.state_info_specs } # yapf: disable policy_state_info_vars_list = [ policy_state_info_vars[k] for k in self.policy.state_info_keys ] # yapf: disable self._policy_network = self.policy.build(obs_var, name='policy') self._old_policy_network = self._old_policy.build(obs_var, name='policy') policy_loss_inputs = graph_inputs( 'PolicyLossInputs', obs_var=obs_var, action_var=action_var, reward_var=reward_var, valid_var=valid_var, feat_diff=feat_diff, param_eta=param_eta, param_v=param_v, policy_state_info_vars=policy_state_info_vars, ) policy_opt_inputs = graph_inputs( 'PolicyOptInputs', obs_var=obs_var, action_var=action_var, reward_var=reward_var, valid_var=valid_var, feat_diff=feat_diff, param_eta=param_eta, param_v=param_v, policy_state_info_vars_list=policy_state_info_vars_list, ) dual_opt_inputs = graph_inputs( 'DualOptInputs', reward_var=reward_var, valid_var=valid_var, feat_diff=feat_diff, param_eta=param_eta, param_v=param_v, policy_state_info_vars_list=policy_state_info_vars_list, ) return policy_loss_inputs, policy_opt_inputs, dual_opt_inputs def _build_policy_loss(self, i): """Build policy loss and other output tensors. Args: i (namedtuple): Collection of variables to compute policy loss. Returns: tf.Tensor: Policy loss. tf.Tensor: Mean policy KL divergence. Raises: NotImplementedError: If is_recurrent is True. """ pol_dist = self._policy_network.dist old_pol_dist = self._old_policy_network.dist # Initialize dual params self._param_eta = 15. self._param_v = np.random.rand( self._env_spec.observation_space.flat_dim * 2 + 4) with tf.name_scope('bellman_error'): delta_v = tf.boolean_mask(i.reward_var, i.valid_var) + tf.tensordot( i.feat_diff, i.param_v, 1) with tf.name_scope('policy_loss'): ll = pol_dist.log_prob(i.action_var) ll = tf.boolean_mask(ll, i.valid_var) loss = -tf.reduce_mean( ll * tf.exp(delta_v / i.param_eta - tf.reduce_max(delta_v / i.param_eta))) reg_params = self.policy.get_regularizable_vars() loss += self._l2_reg_loss * tf.reduce_sum( [tf.reduce_mean(tf.square(param)) for param in reg_params]) / len(reg_params) with tf.name_scope('kl'): kl = old_pol_dist.kl_divergence(pol_dist) pol_mean_kl = tf.reduce_mean(kl) with tf.name_scope('dual'): dual_loss = i.param_eta * self._epsilon + ( i.param_eta * tf.math.log( tf.reduce_mean( tf.exp(delta_v / i.param_eta - tf.reduce_max(delta_v / i.param_eta)))) + i.param_eta * tf.reduce_max(delta_v / i.param_eta)) dual_loss += self._l2_reg_dual * (tf.square(i.param_eta) + tf.square(1 / i.param_eta)) dual_grad = tf.gradients(dual_loss, [i.param_eta, i.param_v]) # yapf: disable self._f_dual = tensor_utils.compile_function( flatten_inputs(self._dual_opt_inputs), dual_loss, log_name='f_dual') # yapf: enable self._f_dual_grad = tensor_utils.compile_function( flatten_inputs(self._dual_opt_inputs), dual_grad, log_name='f_dual_grad') self._f_policy_kl = tensor_utils.compile_function( flatten_inputs(self._policy_opt_inputs), pol_mean_kl, log_name='f_policy_kl') return loss def _dual_opt_input_values(self, samples_data): """Update dual func optimize input values based on samples data. Args: samples_data (dict): Processed sample data. See garage.tf.paths_to_tensors() for details. Returns: list(np.ndarray): Flatten dual function optimization input values. """ policy_state_info_list = [ samples_data['agent_infos'][k] for k in self.policy.state_info_keys ] # yapf: disable # pylint: disable=unexpected-keyword-arg dual_opt_input_values = self._dual_opt_inputs._replace( reward_var=samples_data['rewards'], valid_var=samples_data['valids'], feat_diff=self._feat_diff, param_eta=self._param_eta, param_v=self._param_v, policy_state_info_vars_list=policy_state_info_list, ) return flatten_inputs(dual_opt_input_values) def _policy_opt_input_values(self, samples_data): """Update policy optimize input values based on samples data. Args: samples_data (dict): Processed sample data. See garage.tf.paths_to_tensors() for details. Returns: list(np.ndarray): Flatten policy optimization input values. """ policy_state_info_list = [ samples_data['agent_infos'][k] for k in self.policy.state_info_keys ] # yapf: disable # pylint: disable=unexpected-keyword-arg policy_opt_input_values = self._policy_opt_inputs._replace( obs_var=samples_data['observations'], action_var=samples_data['actions'], reward_var=samples_data['rewards'], valid_var=samples_data['valids'], feat_diff=self._feat_diff, param_eta=self._param_eta, param_v=self._param_v, policy_state_info_vars_list=policy_state_info_list, ) return flatten_inputs(policy_opt_input_values) def _features(self, samples_data): """Get valid view features based on samples data. Args: samples_data (dict): Processed sample data. See garage.tf.paths_to_tensors() for details. Returns: numpy.ndarray: Features for training. """ paths = samples_data['paths'] feat_diff = [] for path in paths: o = np.clip(path['observations'], self._env_spec.observation_space.low, self._env_spec.observation_space.high) lr = len(path['rewards']) al = np.arange(lr).reshape(-1, 1) / self.max_path_length feats = np.concatenate( [o, o**2, al, al**2, al**3, np.ones((lr, 1))], axis=1) # pylint: disable=unsubscriptable-object feats = np.vstack([feats, np.zeros(feats.shape[1])]) feat_diff.append(feats[1:] - feats[:-1]) return np.vstack(feat_diff)