Source code for garage.np.algos.cem

"""Cross Entropy Method."""
import collections

from dowel import logger, tabular
import numpy as np

from garage import log_performance, TrajectoryBatch
from garage.np import paths_to_tensors
from garage.np.algos.rl_algorithm import RLAlgorithm
from garage.tf.samplers import BatchSampler


[docs]class CEM(RLAlgorithm): """Cross Entropy Method. CEM works by iteratively optimizing a gaussian distribution of policy. In each epoch, CEM does the following: 1. Sample n_samples policies from a gaussian distribution of mean cur_mean and std cur_std. 2. Do rollouts for each policy. 3. Update cur_mean and cur_std by doing Maximum Likelihood Estimation over the n_best top policies in terms of return. Args: env_spec (garage.envs.EnvSpec): Environment specification. policy (garage.np.policies.Policy): Action policy. baseline(garage.np.baselines.Baseline): Baseline for GAE (Generalized Advantage Estimation). n_samples (int): Number of policies sampled in one epoch. discount (float): Environment reward discount. max_path_length (int): Maximum length of a single rollout. best_frac (float): The best fraction. init_std (float): Initial std for policy param distribution. extra_std (float): Decaying std added to param distribution. extra_decay_time (float): Epochs that it takes to decay extra std. """ def __init__(self, env_spec, policy, baseline, n_samples, discount=0.99, max_path_length=500, init_std=1, best_frac=0.05, extra_std=1., extra_decay_time=100): self.policy = policy self.max_path_length = max_path_length self.sampler_cls = BatchSampler self._best_frac = best_frac self._baseline = baseline self._init_std = init_std self._extra_std = extra_std self._extra_decay_time = extra_decay_time self._episode_reward_mean = collections.deque(maxlen=100) self._env_spec = env_spec self._discount = discount self._n_samples = n_samples self._cur_std = None self._cur_mean = None self._cur_params = None self._all_returns = None self._all_params = None self._n_best = None self._n_params = None def _sample_params(self, epoch): """Return sample parameters. Args: epoch (int): Epoch number. Returns: np.ndarray: A numpy array of parameter values. """ extra_var_mult = max(1.0 - epoch / self._extra_decay_time, 0) sample_std = np.sqrt( np.square(self._cur_std) + np.square(self._extra_std) * extra_var_mult) return np.random.standard_normal( self._n_params) * sample_std + self._cur_mean
[docs] def train(self, runner): """Initialize variables and start training. Args: runner (LocalRunner): LocalRunner is passed to give algorithm the access to runner.step_epochs(), which provides services such as snapshotting and sampler control. Returns: float: The average return in last epoch cycle. """ # epoch-wise self._cur_std = self._init_std self._cur_mean = self.policy.get_param_values() # epoch-cycle-wise self._cur_params = self._cur_mean self._all_returns = [] self._all_params = [self._cur_mean.copy()] # constant self._n_best = int(self._n_samples * self._best_frac) assert self._n_best >= 1, ( 'n_samples is too low. Make sure that n_samples * best_frac >= 1') self._n_params = len(self._cur_mean) # start actual training last_return = None for _ in runner.step_epochs(): for _ in range(self._n_samples): runner.step_path = runner.obtain_samples(runner.step_itr) last_return = self.train_once(runner.step_itr, runner.step_path) runner.step_itr += 1 return last_return
[docs] def train_once(self, itr, paths): """Perform one step of policy optimization given one batch of samples. Args: itr (int): Iteration number. paths (list[dict]): A list of collected paths. Returns: float: The average return of epoch cycle. """ # -- Stage: Calculate baseline if hasattr(self._baseline, 'predict_n'): baseline_predictions = self._baseline.predict_n(paths) else: baseline_predictions = [ self._baseline.predict(path) for path in paths ] # -- Stage: Pre-process samples based on collected paths samples_data = paths_to_tensors(paths, self.max_path_length, baseline_predictions, self._discount) # -- Stage: Run and calculate performance of the algorithm undiscounted_returns = log_performance( itr, TrajectoryBatch.from_trajectory_list(self._env_spec, paths), discount=self._discount) self._episode_reward_mean.extend(undiscounted_returns) tabular.record('Extras/EpisodeRewardMean', np.mean(self._episode_reward_mean)) samples_data['average_return'] = np.mean(undiscounted_returns) epoch = itr // self._n_samples i_sample = itr - epoch * self._n_samples tabular.record('Epoch', epoch) tabular.record('# Sample', i_sample) # -- Stage: Process samples_data rtn = samples_data['average_return'] self._all_returns.append(samples_data['average_return']) # -- Stage: Update policy distribution. if (itr + 1) % self._n_samples == 0: avg_rtns = np.array(self._all_returns) best_inds = np.argsort(-avg_rtns)[:self._n_best] best_params = np.array(self._all_params)[best_inds] # MLE of normal distribution self._cur_mean = best_params.mean(axis=0) self._cur_std = best_params.std(axis=0) self.policy.set_param_values(self._cur_mean) # Clear for next epoch rtn = max(self._all_returns) self._all_returns.clear() self._all_params.clear() # -- Stage: Generate a new policy for next path sampling self._cur_params = self._sample_params(itr) self._all_params.append(self._cur_params.copy()) self.policy.set_param_values(self._cur_params) logger.log(tabular) return rtn