"""Module for RL2.
This module contains RL2, RL2Worker and the environment wrapper for RL2.
"""
import abc
import collections
import akro
from dowel import logger
import gym
import numpy as np
from garage import log_multitask_performance, TrajectoryBatch
from garage.envs import EnvSpec
from garage.misc import tensor_utils as np_tensor_utils
from garage.np.algos import MetaRLAlgorithm
from garage.sampler import DefaultWorker
from garage.tf.algos._rl2npo import RL2NPO
[docs]class RL2Env(gym.Wrapper):
"""Environment wrapper for RL2.
In RL2, observation is concatenated with previous action,
reward and terminal signal to form new observation.
Args:
env (gym.Env): An env that will be wrapped.
"""
def __init__(self, env):
super().__init__(env)
action_space = akro.from_gym(self.env.action_space)
observation_space = self._create_rl2_obs_space()
self._spec = EnvSpec(action_space=action_space,
observation_space=observation_space)
def _create_rl2_obs_space(self):
"""Create observation space for RL2.
Returns:
gym.spaces.Box: Augmented observation space.
"""
obs_flat_dim = np.prod(self.env.observation_space.shape)
action_flat_dim = np.prod(self.env.action_space.shape)
return akro.Box(low=-np.inf,
high=np.inf,
shape=(obs_flat_dim + action_flat_dim + 1 + 1, ))
[docs] def reset(self, **kwargs):
"""gym.Env reset function.
Args:
kwargs: Keyword arguments.
Returns:
np.ndarray: augmented observation.
"""
del kwargs
obs = self.env.reset()
return np.concatenate(
[obs, np.zeros(self.env.action_space.shape), [0], [0]])
[docs] def step(self, action):
"""gym.Env step function.
Args:
action (int): action taken.
Returns:
np.ndarray: augmented observation.
float: reward.
bool: terminal signal.
dict: environment info.
"""
next_obs, reward, done, info = self.env.step(action)
next_obs = np.concatenate([next_obs, action, [reward], [done]])
return next_obs, reward, done, info
@property
def spec(self):
"""Environment specification.
Returns:
EnvSpec: Environment specification.
"""
return self._spec
[docs]class RL2Worker(DefaultWorker):
"""Initialize a worker for RL2.
In RL2, policy does not reset between trajectories in each meta batch.
Policy only resets once at the beginning of a trial/meta batch.
Args:
seed(int): The seed to use to intialize random number generators.
max_path_length(int or float): The maximum length paths which will
be sampled. Can be (floating point) infinity.
worker_number(int): The number of the worker where this update is
occurring. This argument is used to set a different seed for each
worker.
n_paths_per_trial (int): Number of trajectories sampled per trial/
meta batch. Policy resets in the beginning of a meta batch,
and obtain `n_paths_per_trial` trajectories in one meta batch.
Attributes:
agent(Policy or None): The worker's agent.
env(gym.Env or None): The worker's environment.
"""
def __init__(
self,
*, # Require passing by keyword, since everything's an int.
seed,
max_path_length,
worker_number,
n_paths_per_trial=2):
self._n_paths_per_trial = n_paths_per_trial
super().__init__(seed=seed,
max_path_length=max_path_length,
worker_number=worker_number)
[docs] def start_rollout(self):
"""Begin a new rollout."""
self._path_length = 0
self._prev_obs = self.env.reset()
[docs] def rollout(self):
"""Sample a single rollout of the agent in the environment.
Returns:
garage.TrajectoryBatch: The collected trajectory.
"""
self.agent.reset()
for _ in range(self._n_paths_per_trial):
self.start_rollout()
while not self.step_rollout():
pass
self._agent_infos['batch_idx'] = np.full(len(self._rewards),
self._worker_number)
return self.collect_rollout()
[docs]class NoResetPolicy:
"""A policy that does not reset.
For RL2 meta-test, the policy should not reset after meta-RL
adapation. The hidden state will be retained as it is where
the adaptation takes place.
Args:
policy (garage.tf.policies.Policy): Policy itself.
Returns:
object: The wrapped policy that does not reset.
"""
def __init__(self, policy):
self._policy = policy
[docs] def reset(self):
"""gym.Env reset function."""
[docs] def get_action(self, obs):
"""Get a single action from this policy for the input observation.
Args:
obs (numpy.ndarray): Observation from environment.
Returns:
tuple[numpy.ndarray, dict]: Predicted action and agent
info.
"""
return self._policy.get_action(obs)
[docs] def get_param_values(self):
"""Return values of params.
Returns:
np.ndarray: Policy parameters values.
"""
return self._policy.get_param_values()
[docs] def set_param_values(self, params):
"""Set param values.
Args:
params (np.ndarray): A numpy array of parameter values.
"""
self._policy.set_param_values(params)
# pylint: disable=protected-access
[docs]class RL2AdaptedPolicy:
"""A RL2 policy after adaptation.
Args:
policy (garage.tf.policies.Policy): Policy itself.
"""
def __init__(self, policy):
self._initial_hiddens = policy._prev_hiddens[:]
self._policy = policy
[docs] def reset(self):
"""gym.Env reset function."""
self._policy._prev_hiddens = self._initial_hiddens
[docs] def get_action(self, obs):
"""Get a single action from this policy for the input observation.
Args:
obs (numpy.ndarray): Observation from environment.
Returns:
tuple(numpy.ndarray, dict): Predicted action and agent info.
"""
return self._policy.get_action(obs)
[docs] def get_param_values(self):
"""Return values of params.
Returns:
tuple(np.ndarray, np.ndarray): Policy parameters values
and initial hidden state that will be set every time
the policy is used for meta-test.
"""
return (self._policy.get_param_values(), self._initial_hiddens)
[docs] def set_param_values(self, params):
"""Set param values.
Args:
params (tuple(np.ndarray, np.ndarray)): Two numpy array of
parameter values, one of the network parameters, one
for the initial hidden state.
"""
inner_params, hiddens = params
self._policy.set_param_values(inner_params)
self._initial_hiddens = hiddens
[docs]class RL2(MetaRLAlgorithm, abc.ABC):
"""RL^2.
Reference: https://arxiv.org/pdf/1611.02779.pdf.
When sampling for RL^2, there are more than one environments to be
sampled from. In the original implementation, within each task/environment,
all rollouts sampled will be concatenated into one single rollout, and fed
to the inner algorithm. Thus, returns and advantages are calculated across
the rollout.
RL2Worker is required in sampling for RL2.
See example/tf/rl2_ppo_halfcheetah.py for reference.
User should not instantiate RL2 directly.
Currently garage supports PPO and TRPO as inner algorithm. Refer to
garage/tf/algos/rl2ppo.py and garage/tf/algos/rl2trpo.py.
Args:
rl2_max_path_length (int): Maximum length for trajectories with respect
to RL^2. Notice that it is different from the maximum path length
for the inner algorithm.
meta_batch_size (int): Meta batch size.
task_sampler (garage.experiment.TaskSampler): Task sampler.
meta_evaluator (garage.experiment.MetaEvaluator): Evaluator for meta-RL
algorithms.
n_epochs_per_eval (int): If meta_evaluator is passed, meta-evaluation
will be performed every `n_epochs_per_eval` epochs.
inner_algo_args (dict): Arguments for inner algorithm.
"""
def __init__(self, rl2_max_path_length, meta_batch_size, task_sampler,
meta_evaluator, n_epochs_per_eval, **inner_algo_args):
self._inner_algo = RL2NPO(**inner_algo_args)
self._rl2_max_path_length = rl2_max_path_length
self.env_spec = self._inner_algo._env_spec
self._n_epochs_per_eval = n_epochs_per_eval
self._flatten_input = self._inner_algo._flatten_input
self._policy = self._inner_algo.policy
self._discount = self._inner_algo._discount
self._meta_batch_size = meta_batch_size
self._task_sampler = task_sampler
self._meta_evaluator = meta_evaluator
[docs] def train(self, runner):
"""Obtain samplers and start actual training for each epoch.
Args:
runner (LocalRunner): LocalRunner is passed to give algorithm
the access to runner.step_epochs(), which provides services
such as snapshotting and sampler control.
Returns:
float: The average return in last epoch.
"""
last_return = None
for _ in runner.step_epochs():
if runner.step_itr % self._n_epochs_per_eval == 0:
if self._meta_evaluator is not None:
self._meta_evaluator.evaluate(self)
runner.step_path = runner.obtain_samples(
runner.step_itr,
env_update=self._task_sampler.sample(self._meta_batch_size))
last_return = self.train_once(runner.step_itr, runner.step_path)
runner.step_itr += 1
return last_return
[docs] def train_once(self, itr, paths):
"""Perform one step of policy optimization given one batch of samples.
Args:
itr (int): Iteration number.
paths (list[dict]): A list of collected paths.
Returns:
numpy.float64: Average return.
"""
paths = self._process_samples(itr, paths)
logger.log('Optimizing policy...')
self._inner_algo.optimize_policy(paths)
return paths['average_return']
[docs] def get_exploration_policy(self):
"""Return a policy used before adaptation to a specific task.
Each time it is retrieved, this policy should only be evaluated in one
task.
Returns:
object: The policy used to obtain samples that are later
used for meta-RL adaptation.
"""
self._policy.reset()
return NoResetPolicy(self._policy)
# pylint: disable=protected-access
[docs] def adapt_policy(self, exploration_policy, exploration_trajectories):
"""Produce a policy adapted for a task.
Args:
exploration_policy (garage.Policy): A policy which was returned
from get_exploration_policy(), and which generated
exploration_trajectories by interacting with an environment.
The caller may not use this object after passing it into this
method.
exploration_trajectories (garage.TrajectoryBatch): Trajectories to
adapt to, generated by exploration_policy exploring the
environment.
Returns:
garage.tf.policies.Policy: A policy adapted to the task represented
by the exploration_trajectories.
"""
return RL2AdaptedPolicy(exploration_policy._policy)
# pylint: disable=protected-access
def _process_samples(self, itr, paths):
# pylint: disable=too-many-statements
"""Return processed sample data based on the collected paths.
Args:
itr (int): Iteration number.
paths (OrderedDict[dict]): A list of collected paths for each
task. In RL^2, there are n environments/tasks and paths in
each of them will be concatenated at some point and fed to
the policy.
Returns:
dict: Processed sample data, with key
* observations: (numpy.ndarray)
* actions: (numpy.ndarray)
* rewards: (numpy.ndarray)
* returns: (numpy.ndarray)
* valids: (numpy.ndarray)
* agent_infos: (dict)
* env_infos: (dict)
* paths: (list[dict])
* average_return: (numpy.float64)
Raises:
ValueError: If 'batch_idx' is not found.
"""
concatenated_paths = []
paths_by_task = collections.defaultdict(list)
for path in paths:
path['returns'] = np_tensor_utils.discount_cumsum(
path['rewards'], self._discount)
path['lengths'] = [len(path['rewards'])]
if 'batch_idx' in path:
paths_by_task[path['batch_idx']].append(path)
elif 'batch_idx' in path['agent_infos']:
paths_by_task[path['agent_infos']['batch_idx'][0]].append(path)
else:
raise ValueError(
'Batch idx is required for RL2 but not found, '
'Make sure to use garage.tf.algos.rl2.RL2Worker '
'for sampling')
# all path in paths_by_task[i] are sampled from task[i]
for _paths in paths_by_task.values():
concatenated_path = self._concatenate_paths(_paths)
concatenated_paths.append(concatenated_path)
# stack and pad to max path length of the concatenated
# path, which will be fed to inner algo
# i.e. max_path_length * episode_per_task
concatenated_paths_stacked = (
np_tensor_utils.stack_and_pad_tensor_dict_list(
concatenated_paths, self._inner_algo.max_path_length))
name_map = None
if hasattr(self._task_sampler, '_envs') and hasattr(
self._task_sampler._envs[0].env, 'all_task_names'):
names = [
env.env.all_task_names[0] for env in self._task_sampler._envs
]
name_map = dict(enumerate(names))
undiscounted_returns = log_multitask_performance(
itr,
TrajectoryBatch.from_trajectory_list(self.env_spec, paths),
self._inner_algo._discount,
name_map=name_map)
concatenated_paths_stacked['paths'] = concatenated_paths
concatenated_paths_stacked['average_return'] = np.mean(
undiscounted_returns)
return concatenated_paths_stacked
def _concatenate_paths(self, paths):
"""Concatenate paths.
The input paths are from different rollouts but same task/environment.
In RL^2, paths within each meta batch are all concatenate into a single
path and fed to the policy.
Args:
paths (dict): Input paths. All paths are from different rollouts,
but the same task/environment.
Returns:
dict: Concatenated paths from the same task/environment. Shape of
values: :math:`[max_path_length * episode_per_task, S^*]`
list[dict]: Original input paths. Length of the list is
:math:`episode_per_task` and each path in the list has
values of shape :math:`[max_path_length, S^*]`
"""
if self._flatten_input:
observations = np.concatenate([
self.env_spec.observation_space.flatten_n(path['observations'])
for path in paths
])
else:
observations = np.concatenate(
[path['observations'] for path in paths])
actions = np.concatenate([
self.env_spec.action_space.flatten_n(path['actions'])
for path in paths
])
valids = np.concatenate(
[np.ones_like(path['rewards']) for path in paths])
baselines = np.concatenate(
[np.zeros_like(path['rewards']) for path in paths])
concatenated_path = np_tensor_utils.concat_tensor_dict_list(paths)
concatenated_path['observations'] = observations
concatenated_path['actions'] = actions
concatenated_path['valids'] = valids
concatenated_path['baselines'] = baselines
return concatenated_path
@property
def policy(self):
"""Policy.
Returns:
garage.Policy: Policy to be used.
"""
return self._inner_algo.policy
@property
def max_path_length(self):
"""Max path length.
Returns:
int: Maximum path length in a trajectory.
"""
return self._rl2_max_path_length