Source code for garage.tf.policies.gaussian_mlp_policy

"""GaussianMLPPolicy with GaussianMLPModel."""
import akro
import numpy as np
import tensorflow as tf

from garage.tf.models import GaussianMLPModel
from garage.tf.policies.base import StochasticPolicy


[docs]class GaussianMLPPolicy(StochasticPolicy): """GaussianMLPPolicy with GaussianMLPModel. A policy that contains a MLP to make prediction based on a gaussian distribution. Args: env_spec (garage.envs.env_spec.EnvSpec): Environment specification. name (str): Model name, also the variable scope. hidden_sizes (list[int]): Output dimension of dense layer(s) for the MLP for mean. For example, (32, 32) means the MLP consists of two hidden layers, each with 32 hidden units. hidden_nonlinearity (callable): Activation function for intermediate dense layer(s). It should return a tf.Tensor. Set it to None to maintain a linear activation. hidden_w_init (callable): Initializer function for the weight of intermediate dense layer(s). The function should return a tf.Tensor. hidden_b_init (callable): Initializer function for the bias of intermediate dense layer(s). The function should return a tf.Tensor. output_nonlinearity (callable): Activation function for output dense layer. It should return a tf.Tensor. Set it to None to maintain a linear activation. output_w_init (callable): Initializer function for the weight of output dense layer(s). The function should return a tf.Tensor. output_b_init (callable): Initializer function for the bias of output dense layer(s). The function should return a tf.Tensor. learn_std (bool): Is std trainable. adaptive_std (bool): Is std a neural network. If False, it will be a parameter. std_share_network (bool): Boolean for whether mean and std share the same network. init_std (float): Initial value for std. std_hidden_sizes (list[int]): Output dimension of dense layer(s) for the MLP for std. For example, (32, 32) means the MLP consists of two hidden layers, each with 32 hidden units. min_std (float): If not None, the std is at least the value of min_std, to avoid numerical issues. max_std (float): If not None, the std is at most the value of max_std, to avoid numerical issues. std_hidden_nonlinearity: Nonlinearity for each hidden layer in the std network. std_output_nonlinearity: Nonlinearity for output layer in the std network. std_parametrization (str): How the std should be parametrized. There are a few options: - exp: the logarithm of the std will be stored, and applied a exponential transformation - softplus: the std will be computed as log(1+exp(x)) layer_normalization (bool): Bool for using layer normalization or not. :return: """ def __init__(self, env_spec, name='GaussianMLPPolicy', hidden_sizes=(32, 32), hidden_nonlinearity=tf.nn.tanh, hidden_w_init=tf.glorot_uniform_initializer(), hidden_b_init=tf.zeros_initializer(), output_nonlinearity=None, output_w_init=tf.glorot_uniform_initializer(), output_b_init=tf.zeros_initializer(), learn_std=True, adaptive_std=False, std_share_network=False, init_std=1.0, min_std=1e-6, max_std=None, std_hidden_sizes=(32, 32), std_hidden_nonlinearity=tf.nn.tanh, std_output_nonlinearity=None, std_parameterization='exp', layer_normalization=False): assert isinstance(env_spec.action_space, akro.Box) super().__init__(name, env_spec) self.obs_dim = env_spec.observation_space.flat_dim self.action_dim = env_spec.action_space.flat_dim self.model = GaussianMLPModel( output_dim=self.action_dim, hidden_sizes=hidden_sizes, hidden_nonlinearity=hidden_nonlinearity, hidden_w_init=hidden_w_init, hidden_b_init=hidden_b_init, output_nonlinearity=output_nonlinearity, output_w_init=output_w_init, output_b_init=output_b_init, learn_std=learn_std, adaptive_std=adaptive_std, std_share_network=std_share_network, init_std=init_std, min_std=min_std, max_std=max_std, std_hidden_sizes=std_hidden_sizes, std_hidden_nonlinearity=std_hidden_nonlinearity, std_output_nonlinearity=std_output_nonlinearity, std_parameterization=std_parameterization, layer_normalization=layer_normalization, name='GaussianMLPModel') self._initialize() def _initialize(self): state_input = tf.compat.v1.placeholder(tf.float32, shape=(None, self.obs_dim)) with tf.compat.v1.variable_scope(self.name) as vs: self._variable_scope = vs self.model.build(state_input) self._f_dist = tf.compat.v1.get_default_session().make_callable( [ self.model.networks['default'].mean, self.model.networks['default'].log_std ], feed_list=[self.model.networks['default'].input]) @property def vectorized(self): """Vectorized or not.""" return True
[docs] def dist_info_sym(self, obs_var, state_info_vars=None, name='default'): """Symbolic graph of the distribution.""" with tf.compat.v1.variable_scope(self._variable_scope): mean_var, log_std_var, _, _ = self.model.build(obs_var, name=name) return dict(mean=mean_var, log_std=log_std_var)
[docs] def get_action(self, observation): """Get action from the policy.""" flat_obs = self.observation_space.flatten(observation) mean, log_std = self._f_dist([flat_obs]) rnd = np.random.normal(size=mean.shape) sample = rnd * np.exp(log_std) + mean sample = self.action_space.unflatten(sample[0]) mean = self.action_space.unflatten(mean[0]) log_std = self.action_space.unflatten(log_std[0]) return sample, dict(mean=mean, log_std=log_std)
[docs] def get_actions(self, observations): """Get actions from the policy.""" flat_obs = self.observation_space.flatten_n(observations) means, log_stds = self._f_dist(flat_obs) rnd = np.random.normal(size=means.shape) samples = rnd * np.exp(log_stds) + means samples = self.action_space.unflatten_n(samples) means = self.action_space.unflatten_n(means) log_stds = self.action_space.unflatten_n(log_stds) return samples, dict(mean=means, log_std=log_stds)
[docs] def get_params(self, trainable=True): """Get the trainable variables.""" return self.get_trainable_vars()
@property def distribution(self): """Policy distribution.""" return self.model.networks['default'].dist def __getstate__(self): """Object.__getstate__.""" new_dict = super().__getstate__() del new_dict['_f_dist'] return new_dict def __setstate__(self, state): """Object.__setstate__.""" super().__setstate__(state) self._initialize()