Source code for garage.torch.algos.trpo

"""Trust Region Policy Optimization."""
import torch

from garage.torch.algos import VPG
from garage.torch.optimizers import ConjugateGradientOptimizer
from garage.torch.optimizers import OptimizerWrapper


[docs]class TRPO(VPG): """Trust Region Policy Optimization (TRPO). Args: env_spec (garage.envs.EnvSpec): Environment specification. policy (garage.torch.policies.Policy): Policy. value_function (garage.torch.value_functions.ValueFunction): The value function. policy_optimizer (garage.torch.optimizer.OptimizerWrapper): Optimizer for policy. vf_optimizer (garage.torch.optimizer.OptimizerWrapper): Optimizer for value function. max_path_length (int): Maximum length of a single rollout. num_train_per_epoch (int): Number of train_once calls per epoch. discount (float): Discount. gae_lambda (float): Lambda used for generalized advantage estimation. center_adv (bool): Whether to rescale the advantages so that they have mean 0 and standard deviation 1. positive_adv (bool): Whether to shift the advantages so that they are always positive. When used in conjunction with center_adv the advantages will be standardized before shifting. policy_ent_coeff (float): The coefficient of the policy entropy. Setting it to zero would mean no entropy regularization. use_softplus_entropy (bool): Whether to estimate the softmax distribution of the entropy to prevent the entropy from being negative. stop_entropy_gradient (bool): Whether to stop the entropy gradient. entropy_method (str): A string from: 'max', 'regularized', 'no_entropy'. The type of entropy method to use. 'max' adds the dense entropy to the reward for each time step. 'regularized' adds the mean entropy to the surrogate objective. See https://arxiv.org/abs/1805.00909 for more details. """ def __init__(self, env_spec, policy, value_function, policy_optimizer=None, vf_optimizer=None, max_path_length=100, num_train_per_epoch=1, discount=0.99, gae_lambda=0.98, center_adv=True, positive_adv=False, policy_ent_coeff=0.0, use_softplus_entropy=False, stop_entropy_gradient=False, entropy_method='no_entropy'): if policy_optimizer is None: policy_optimizer = OptimizerWrapper( (ConjugateGradientOptimizer, dict(max_constraint_value=0.01)), policy) if vf_optimizer is None: vf_optimizer = OptimizerWrapper( (torch.optim.Adam, dict(lr=2.5e-4)), value_function, max_optimization_epochs=10, minibatch_size=64) super().__init__(env_spec=env_spec, policy=policy, value_function=value_function, policy_optimizer=policy_optimizer, vf_optimizer=vf_optimizer, max_path_length=max_path_length, num_train_per_epoch=num_train_per_epoch, discount=discount, gae_lambda=gae_lambda, center_adv=center_adv, positive_adv=positive_adv, policy_ent_coeff=policy_ent_coeff, use_softplus_entropy=use_softplus_entropy, stop_entropy_gradient=stop_entropy_gradient, entropy_method=entropy_method) def _compute_objective(self, advantages, obs, actions, rewards): r"""Compute objective value. Args: advantages (torch.Tensor): Advantage value at each step with shape :math:`(N \dot [T], )`. obs (torch.Tensor): Observation from the environment with shape :math:`(N \dot [T], O*)`. actions (torch.Tensor): Actions fed to the environment with shape :math:`(N \dot [T], A*)`. rewards (torch.Tensor): Acquired rewards with shape :math:`(N \dot [T], )`. Returns: torch.Tensor: Calculated objective values with shape :math:`(N \dot [T], )`. """ with torch.no_grad(): old_ll = self._old_policy(obs)[0].log_prob(actions) new_ll = self.policy(obs)[0].log_prob(actions) likelihood_ratio = (new_ll - old_ll).exp() # Calculate surrogate surrogate = likelihood_ratio * advantages return surrogate def _train_policy(self, obs, actions, rewards, advantages): r"""Train the policy. Args: obs (torch.Tensor): Observation from the environment with shape :math:`(N, O*)`. actions (torch.Tensor): Actions fed to the environment with shape :math:`(N, A*)`. rewards (torch.Tensor): Acquired rewards with shape :math:`(N, )`. advantages (torch.Tensor): Advantage value at each step with shape :math:`(N, )`. Returns: torch.Tensor: Calculated mean scalar value of policy loss (float). """ self._policy_optimizer.zero_grad() loss = self._compute_loss_with_adv(obs, actions, rewards, advantages) loss.backward() self._policy_optimizer.step( f_loss=lambda: self._compute_loss_with_adv(obs, actions, rewards, advantages), f_constraint=lambda: self._compute_kl_constraint(obs)) return loss