"""Data types for agent-based learning."""
from dataclasses import dataclass
import enum
from typing import Dict, List
import warnings
import numpy as np
from garage.np import (concat_tensor_dict_list, pad_batch_array,
slice_nested_dict, stack_tensor_dict_list)
# pylint: disable=too-many-lines
class StepType(enum.IntEnum):
"""Defines the status of a :class:`~TimeStep` within a sequence.
Note that the last :class:`~TimeStep` in a sequence can either be
:attribute:`StepType.TERMINAL` or :attribute:`StepType.TIMEOUT`.
Suppose max_episode_length = 5:
* A success sequence terminated at step 4 will look like:
FIRST, MID, MID, TERMINAL
* A success sequence terminated at step 5 will look like:
FIRST, MID, MID, MID, TERMINAL
* An unsuccessful sequence truncated by time limit will look like:
FIRST, MID, MID, MID, TIMEOUT
"""
# Denotes the first :class:`~TimeStep` in a sequence.
FIRST = 0
# Denotes any :class:`~TimeStep` in the middle of a sequence (i.e. not the
# first or last one).
MID = 1
# Denotes the last :class:`~TimeStep` in a sequence that terminates
# successfully.
TERMINAL = 2
# Denotes the last :class:`~TimeStep` in a sequence truncated by time
# limit.
TIMEOUT = 3
[docs] @classmethod
def get_step_type(cls, step_cnt, max_episode_length, done):
"""Determines the step type based on step cnt and done signal.
Args:
step_cnt (int): current step cnt of the environment.
max_episode_length (int): maximum episode length.
done (bool): the done signal returned by Environment.
Returns:
StepType: the step type.
Raises:
ValueError: if step_cnt is < 1. In this case a environment's
`reset()` is likely not called yet and the step_cnt is None.
"""
if max_episode_length is not None and step_cnt >= max_episode_length:
return StepType.TIMEOUT
elif done:
return StepType.TERMINAL
elif step_cnt == 1:
return StepType.FIRST
elif step_cnt < 1:
raise ValueError('Expect step_cnt to be >= 1, but got {} '
'instead. Did you forget to call `reset('
')`?'.format(step_cnt))
else:
return StepType.MID
[docs]@dataclass(frozen=True)
class TimeStep:
r"""A single TimeStep in an environment.
A :class:`~TimeStep` represents a single sample when an agent interacts
with an environment. It describes as SARS (State–action–reward–state)
tuple that characterizes the evolution of a MDP.
Attributes:
env_spec (EnvSpec): Specification for the environment from which this
data was sampled.
episode_info (dict[str, np.ndarray]): A dict of numpy arrays of shape
:math:`(S*^,)` containing episode-level information of each
episode. For example, in goal-conditioned reinforcement learning
this could contain the goal state for each episode.
observation (numpy.ndarray): A numpy array of shape :math:`(O^*)`
containing the observation for this time step in the
environment. These must conform to
:obj:`EnvStep.observation_space`.
The observation before applying the action.
`None` if `step_type` is `StepType.FIRST`, i.e. at the start of a
sequence.
action (numpy.ndarray): A numpy array of shape :math:`(A^*)`
containing the action for this time step. These must conform
to :obj:`EnvStep.action_space`.
`None` if `step_type` is `StepType.FIRST`, i.e. at the start of a
sequence.
reward (float): A float representing the reward for taking the action
given the observation, at this time step.
`None` if `step_type` is `StepType.FIRST`, i.e. at the start of a
sequence.
next_observation (numpy.ndarray): A numpy array of shape :math:`(O^*)`
containing the observation for this time step in the
environment. These must conform to
:obj:`EnvStep.observation_space`.
The observation after applying the action.
env_info (dict): A dict arbitrary environment state information.
agent_info (dict): A dict of arbitrary agent
state information. For example, this may contain the hidden states
from an RNN policy.
step_type (StepType): a :class:`~StepType` enum value. Can be one of
:attribute:`~StepType.FIRST`, :attribute:`~StepType.MID`,
:attribute:`~StepType.TERMINAL`, or :attribute:`~StepType.TIMEOUT`.
"""
env_spec: 'garage.EnvSpec' # NOQA: F821
episode_info: Dict[str, np.ndarray]
observation: np.ndarray
action: np.ndarray
reward: float
next_observation: np.ndarray
env_info: Dict[str, np.ndarray]
agent_info: Dict[str, np.ndarray]
step_type: StepType
@property
def first(self):
"""bool: Whether this step is the first of its episode."""
return self.step_type is StepType.FIRST
@property
def mid(self):
"""bool: Whether this step is in the middle of its episode."""
return self.step_type is StepType.MID
@property
def terminal(self):
"""bool: Whether this step records a termination condition."""
return self.step_type is StepType.TERMINAL
@property
def timeout(self):
"""bool: Whether this step records a timeout condition."""
return self.step_type is StepType.TIMEOUT
@property
def last(self):
"""bool: Whether this step is the last of its episode."""
return self.step_type is StepType.TERMINAL or self.step_type \
is StepType.TIMEOUT
[docs] @classmethod
def from_env_step(cls, env_step, last_observation, agent_info,
episode_info):
"""Create a TimeStep from a EnvStep.
Args:
env_step (EnvStep): the env step returned by the environment.
last_observation (numpy.ndarray): A numpy array of shape
:math:`(O^*)` containing the observation for this time
step in the environment. These must conform to
:attr:`EnvStep.observation_space`.
The observation before applying the action.
agent_info (dict): A dict of arbitrary agent state information.
episode_info (dict): A dict of arbitrary information associated
with the whole episode.
Returns:
TimeStep: The TimeStep with all information of EnvStep plus the
agent info.
"""
return cls(env_spec=env_step.env_spec,
episode_info=episode_info,
observation=last_observation,
action=env_step.action,
reward=env_step.reward,
next_observation=env_step.observation,
env_info=env_step.env_info,
agent_info=agent_info,
step_type=env_step.step_type)
[docs]@dataclass(frozen=True)
class TimeStepBatch:
# pylint: disable=missing-param-doc, missing-type-doc
"""A tuple representing a batch of TimeSteps.
Data type for off-policy algorithms, imitation learning and batch-RL.
Attributes:
env_spec (EnvSpec): Specification for the environment from
which this data was sampled.
episode_infos (dict[str, np.ndarray]): A dict of numpy arrays
containing the episode-level information of each episode. Each
value of this dict should be a numpy array of shape :math:`(N,
S^*)`. For example, in goal-conditioned reinforcement learning this
could contain the goal state for each episode.
observations (numpy.ndarray): Non-flattened array of observations.
Typically has shape (batch_size, S^*) (the unflattened state space
of the current environment).
actions (numpy.ndarray): Non-flattened array of actions. Must
have shape (batch_size, S^*) (the unflattened action space of the
current environment).
rewards (numpy.ndarray): Array of rewards of shape (batch_size, 1).
next_observation (numpy.ndarray): Non-flattened array of next
observations. Has shape (batch_size, S^*). next_observations[i] was
observed by the agent after taking actions[i].
env_infos (dict): A dict arbitrary environment state
information.
agent_infos (dict): A dict of arbitrary agent state information. For
example, this may contain the hidden states from an RNN policy.
step_types (numpy.ndarray): A numpy array of `StepType with shape (
batch_size,) containing the time step types for all transitions in
this batch.
Raises:
ValueError: If any of the above attributes do not conform to their
prescribed types and shapes.
"""
def __post_init__(self):
"""Runs integrity checking after __init__."""
check_timestep_batch(self, np.ndarray)
env_spec: 'garage.EnvSpec' # NOQA: F821
episode_infos: Dict[str, np.ndarray or dict]
observations: np.ndarray
actions: np.ndarray
rewards: np.ndarray
next_observations: np.ndarray
agent_infos: Dict[str, np.ndarray or dict]
env_infos: Dict[str, np.ndarray or dict]
step_types: np.ndarray
[docs] @classmethod
def concatenate(cls, *batches):
"""Concatenate two or more :class:`TimeStepBatch`s.
Args:
batches (list[TimeStepBatch]): Batches to concatenate.
Returns:
TimeStepBatch: The concatenation of the batches.
Raises:
ValueError: If no TimeStepBatches are provided.
"""
if len(batches) < 1:
raise ValueError('Please provide at least one TimeStepBatch to '
'concatenate')
episode_infos = {
k: np.concatenate([b.episode_infos[k] for b in batches])
for k in batches[0].episode_infos.keys()
}
env_infos = {
k: np.concatenate([b.env_infos[k] for b in batches])
for k in batches[0].env_infos.keys()
}
agent_infos = {
k: np.concatenate([b.agent_infos[k] for b in batches])
for k in batches[0].agent_infos.keys()
}
return cls(
env_spec=batches[0].env_spec,
episode_infos=episode_infos,
observations=np.concatenate(
[batch.observations for batch in batches]),
actions=np.concatenate([batch.actions for batch in batches]),
rewards=np.concatenate([batch.rewards for batch in batches]),
next_observations=np.concatenate(
[batch.next_observations for batch in batches]),
env_infos=env_infos,
agent_infos=agent_infos,
step_types=np.concatenate([batch.step_types for batch in batches]))
[docs] def split(self) -> List['TimeStepBatch']:
"""Split a :class:`~TimeStepBatch` into a list of :class:`~TimeStepBatch`s.
The opposite of concatenate.
Returns:
list[TimeStepBatch]: A list of :class:`TimeStepBatch`s, with one
:class:`~TimeStep` per :class:`~TimeStepBatch`.
"""
time_steps = []
for i in range(len(self.rewards)):
time_step = TimeStepBatch(
episode_infos={
k: np.asarray([v[i]])
for (k, v) in self.episode_infos.items()
},
env_spec=self.env_spec,
observations=np.asarray([self.observations[i]]),
actions=np.asarray([self.actions[i]]),
rewards=np.asarray([self.rewards[i]]),
next_observations=np.asarray([self.next_observations[i]]),
env_infos={
k: np.asarray([v[i]])
for (k, v) in self.env_infos.items()
},
agent_infos={
k: np.asarray([v[i]])
for (k, v) in self.agent_infos.items()
},
step_types=np.asarray([self.step_types[i]], dtype=StepType))
time_steps.append(time_step)
return time_steps
[docs] def to_time_step_list(self) -> List[Dict[str, np.ndarray]]:
"""Convert the batch into a list of dictionaries.
Breaks the :class:`~TimeStepBatch` into a list of single time step
sample dictionaries. len(rewards) (or the number of discrete time step)
dictionaries are returned
Returns:
list[dict[str, np.ndarray or dict[str, np.ndarray]]]: Keys:
episode_infos (dict[str, np.ndarray]): A dict of numpy arrays
containing the episode-level information of each episode.
Each value of this dict must be a numpy array of shape
:math:`(S^*,)`. For example, in goal-conditioned
reinforcement learning this could contain the goal state
for each episode.
observations (numpy.ndarray): Non-flattened array of
observations.
Typically has shape (batch_size, S^*) (the unflattened
state space
of the current environment).
actions (numpy.ndarray): Non-flattened array of actions. Must
have shape (batch_size, S^*) (the unflattened action
space of the
current environment).
rewards (numpy.ndarray): Array of rewards of shape (
batch_size,) (1D array of length batch_size).
next_observation (numpy.ndarray): Non-flattened array of next
observations. Has shape (batch_size, S^*).
next_observations[i] was
observed by the agent after taking actions[i].
env_infos (dict): A dict arbitrary environment state
information.
agent_infos (dict): A dict of arbitrary agent state
information. For example, this may contain the
hidden states from an RNN policy.
step_types (numpy.ndarray): A numpy array of `StepType with
shape (batch_size,) containing the time step types for
all transitions in this batch.
"""
samples = []
for i in range(len(self.rewards)):
samples.append({
'episode_infos': {
k: np.asarray([v[i]])
for (k, v) in self.episode_infos.items()
},
'observations':
np.asarray([self.observations[i]]),
'actions':
np.asarray([self.actions[i]]),
'rewards':
np.asarray([self.rewards[i]]),
'next_observations':
np.asarray([self.next_observations[i]]),
'env_infos':
{k: np.asarray([v[i]])
for (k, v) in self.env_infos.items()},
'agent_infos':
{k: np.asarray([v[i]])
for (k, v) in self.agent_infos.items()},
'step_types':
np.asarray([self.step_types[i]])
})
return samples
@property
def terminals(self):
"""Get an array of boolean indicating ternianal information.
Returns:
numpy.ndarray: An array of boolean of shape :math:`(N,)`
indicating whether the `StepType is `TERMINAL
"""
return np.array([s == StepType.TERMINAL for s in self.step_types])
[docs] @classmethod
def from_time_step_list(cls, env_spec, ts_samples):
"""Create a :class:`~TimeStepBatch` from a list of time step dictionaries.
Args:
env_spec (EnvSpec): Specification for the environment from which
this data was sampled.
ts_samples (list[dict[str, np.ndarray or dict[str, np.ndarray]]]):
keys:
* episode_infos (dict[str, np.ndarray]): A dict of numpy arrays
containing the episode-level information of each episode.
Each value of this dict must be a numpy array of shape
:math:`(N, S^*)`. For example, in goal-conditioned
reinforcement learning this could contain the goal state
for each episode.
* observations (numpy.ndarray): Non-flattened array of
observations.
Typically has shape (batch_size, S^*) (the unflattened
state space of the current environment).
* actions (numpy.ndarray): Non-flattened array of actions.
Must have shape (batch_size, S^*) (the unflattened action
space of the current environment).
* rewards (numpy.ndarray): Array of rewards of shape (
batch_size,) (1D array of length batch_size).
* next_observation (numpy.ndarray): Non-flattened array of next
observations. Has shape (batch_size, S^*).
next_observations[i] was observed by the agent after
taking actions[i].
* env_infos (dict): A dict arbitrary environment state
information.
* agent_infos (dict): A dict of arbitrary agent
state information. For example, this may contain the
hidden states from an RNN policy.
* step_types (numpy.ndarray): A numpy array of `StepType with
shape (batch_size,) containing the time step types for all
transitions in this batch.
Returns:
TimeStepBatch: The concatenation of samples.
Raises:
ValueError: If no dicts are provided.
"""
if len(ts_samples) < 1:
raise ValueError('Please provide at least one dict')
ts_batches = [
TimeStepBatch(episode_infos=sample['episode_infos'],
env_spec=env_spec,
observations=sample['observations'],
actions=sample['actions'],
rewards=sample['rewards'],
next_observations=sample['next_observations'],
env_infos=sample['env_infos'],
agent_infos=sample['agent_infos'],
step_types=sample['step_types'])
for sample in ts_samples
]
return TimeStepBatch.concatenate(*ts_batches)
[docs]@dataclass(frozen=True, init=False)
class EpisodeBatch(TimeStepBatch):
# pylint: disable=missing-return-doc, missing-return-type-doc, missing-param-doc, missing-type-doc # noqa: E501
r"""A tuple representing a batch of whole episodes.
Data type for on-policy algorithms.
A :class:`~EpisodeBatch` represents a batch of whole episodes, produced
when one or more agents interacts with one or more environments.
+-----------------------+-------------------------------------------------+
| Symbol | Description |
+=======================+=================================================+
| :math:`N` | Episode batch dimension |
+-----------------------+-------------------------------------------------+
| :math:`[T]` | Variable-length time dimension of each |
| | episode |
+-----------------------+-------------------------------------------------+
| :math:`S^*` | Single-step shape of a time-series tensor |
+-----------------------+-------------------------------------------------+
| :math:`N \bullet [T]` | A dimension computed by flattening a |
| | variable-length time dimension :math:`[T]` into |
| | a single batch dimension with length |
| | :math:`sum_{i \in N} [T]_i` |
+-----------------------+-------------------------------------------------+
Attributes:
env_spec (EnvSpec): Specification for the environment from
which this data was sampled.
episode_infos (dict[str, np.ndarray]): A dict of numpy arrays
containing the episode-level information of each episode. Each
value of this dict should be a numpy array of shape :math:`(N,
S^*)`. For example, in goal-conditioned reinforcement learning this
could contain the goal state for each episode.
observations (numpy.ndarray): A numpy array of shape
:math:`(N \bullet [T], O^*)` containing the (possibly
multi-dimensional) observations for all time steps in this batch.
These must conform to :obj:`EnvStep.observation_space`.
last_observations (numpy.ndarray): A numpy array of shape
:math:`(N, O^*)` containing the last observation of each episode.
This is necessary since there are one more observations than
actions every episode.
actions (numpy.ndarray): A numpy array of shape
:math:`(N \bullet [T], A^*)` containing the (possibly
multi-dimensional) actions for all time steps in this batch. These
must conform to :obj:`EnvStep.action_space`.
rewards (numpy.ndarray): A numpy array of shape
:math:`(N \bullet [T])` containing the rewards for all time steps
in this batch.
env_infos (dict[str, np.ndarray]): A dict of numpy arrays arbitrary
environment state information. Each value of this dict should be
a numpy array of shape :math:`(N \bullet [T])` or :math:`(N \bullet
[T], S^*)`.
agent_infos (dict[str, np.ndarray]): A dict of numpy arrays arbitrary
agent state information. Each value of this dict should be a numpy
array of shape :math:`(N \bullet [T])` or :math:`(N \bullet [T],
S^*)`. For example, this may contain the hidden states from an RNN
policy.
step_types (numpy.ndarray): A numpy array of `StepType with shape
:math:`(N \bullet [T])` containing the time step types for all
transitions in this batch.
lengths (numpy.ndarray): An integer numpy array of shape :math:`(N,)`
containing the length of each episode in this batch. This may be
used to reconstruct the individual episodes.
Raises:
ValueError: If any of the above attributes do not conform to their
prescribed types and shapes.
"""
episode_infos_by_episode: np.ndarray
last_observations: np.ndarray
lengths: np.ndarray
def __init__(self, env_spec, episode_infos, observations,
last_observations, actions, rewards, env_infos, agent_infos,
step_types, lengths): # noqa: D102
# lengths
if len(lengths.shape) != 1:
raise ValueError(
f'lengths has shape {lengths.shape} but must be a ternsor of '
f'shape (N,)')
if not (lengths.dtype.kind == 'u' or lengths.dtype.kind == 'i'):
raise ValueError(
f'lengths has dtype {lengths.dtype}, but must have an '
f'integer dtype')
n_episodes = len(lengths)
# Check episode_infos and last_observations here instead of checking
# episode_infos and next_observations in check_timestep_batch.
for key, val in episode_infos.items():
if not isinstance(val, np.ndarray):
raise ValueError(
f'Entry {key!r} in episode_infos is of type {type(val)!r} '
f'but must be of type {np.ndarray!r}')
if hasattr(val, 'shape'):
if val.shape[0] != n_episodes:
raise ValueError(
f'Entry {key!r} in episode_infos has batch size '
f'{val.shape[0]}, but must have batch size '
f'{n_episodes} to match the number of episodes')
if not isinstance(last_observations, np.ndarray):
raise ValueError(
f'last_observations is not of type {np.ndarray!r}')
if last_observations.shape[0] != n_episodes:
raise ValueError(
f'last_observations has batch size '
f'{last_observations.shape[0]} but must have '
f'batch size {n_episodes} to match the number of episodes')
if not _space_soft_contains(env_spec.observation_space,
last_observations[0]):
raise ValueError(f'last_observations must have the same '
f'number of entries as there are episodes '
f'({n_episodes}) but got data with shape '
'{last_observations[0].shape} entries')
object.__setattr__(self, 'last_observations', last_observations)
object.__setattr__(self, 'lengths', lengths)
object.__setattr__(self, 'env_spec', env_spec)
# Used to compute the episode_infos property, but also used in .split
object.__setattr__(self, 'episode_infos_by_episode', episode_infos)
object.__setattr__(self, 'observations', observations)
# No need for next_observations, it was replaced with a property
object.__setattr__(self, 'actions', actions)
object.__setattr__(self, 'rewards', rewards)
object.__setattr__(self, 'env_infos', env_infos)
object.__setattr__(self, 'agent_infos', agent_infos)
object.__setattr__(self, 'step_types', step_types)
check_timestep_batch(
self,
np.ndarray,
ignored_fields={'next_observations', 'episode_infos'})
[docs] @classmethod
def concatenate(cls, *batches):
"""Create a EpisodeBatch by concatenating EpisodeBatches.
Args:
batches (list[EpisodeBatch]): Batches to concatenate.
Returns:
EpisodeBatch: The concatenation of the batches.
"""
if __debug__:
for b in batches:
assert (set(b.env_infos.keys()) == set(
batches[0].env_infos.keys()))
assert (set(b.agent_infos.keys()) == set(
batches[0].agent_infos.keys()))
env_infos = {
k: np.concatenate([b.env_infos[k] for b in batches])
for k in batches[0].env_infos.keys()
}
agent_infos = {
k: np.concatenate([b.agent_infos[k] for b in batches])
for k in batches[0].agent_infos.keys()
}
episode_infos = {
k: np.concatenate([b.episode_infos_by_episode[k] for b in batches])
for k in batches[0].episode_infos_by_episode.keys()
}
return cls(
episode_infos=episode_infos,
env_spec=batches[0].env_spec,
observations=np.concatenate(
[batch.observations for batch in batches]),
last_observations=np.concatenate(
[batch.last_observations for batch in batches]),
actions=np.concatenate([batch.actions for batch in batches]),
rewards=np.concatenate([batch.rewards for batch in batches]),
env_infos=env_infos,
agent_infos=agent_infos,
step_types=np.concatenate([batch.step_types for batch in batches]),
lengths=np.concatenate([batch.lengths for batch in batches]))
def _episode_ranges(self):
"""Iterate through start and stop indices for each episode.
Yields:
tuple[int, int]: Start index (inclusive) and stop index
(exclusive).
"""
start = 0
for length in self.lengths:
stop = start + length
yield (start, stop)
start = stop
[docs] def split(self):
"""Split an EpisodeBatch into a list of EpisodeBatches.
The opposite of concatenate.
Returns:
list[EpisodeBatch]: A list of EpisodeBatches, with one
episode per batch.
"""
episodes = []
for i, (start, stop) in enumerate(self._episode_ranges()):
eps = EpisodeBatch(
env_spec=self.env_spec,
episode_infos=slice_nested_dict(self.episode_infos_by_episode,
i, i + 1),
observations=self.observations[start:stop],
last_observations=np.asarray([self.last_observations[i]]),
actions=self.actions[start:stop],
rewards=self.rewards[start:stop],
env_infos=slice_nested_dict(self.env_infos, start, stop),
agent_infos=slice_nested_dict(self.agent_infos, start, stop),
step_types=self.step_types[start:stop],
lengths=np.asarray([self.lengths[i]]))
episodes.append(eps)
return episodes
[docs] def to_list(self):
"""Convert the batch into a list of dictionaries.
Returns:
list[dict[str, np.ndarray or dict[str, np.ndarray]]]: Keys:
* observations (np.ndarray): Non-flattened array of
observations. Has shape (T, S^*) (the unflattened state
space of the current environment). observations[i] was
used by the agent to choose actions[i].
* next_observations (np.ndarray): Non-flattened array of
observations. Has shape (T, S^*). next_observations[i] was
observed by the agent after taking actions[i].
* actions (np.ndarray): Non-flattened array of actions. Must
have shape (T, S^*) (the unflattened action space of the
current environment).
* rewards (np.ndarray): Array of rewards of shape (T,) (1D
array of length timesteps).
* agent_infos (dict[str, np.ndarray]): Dictionary of stacked,
non-flattened `agent_info` arrays.
* env_infos (dict[str, np.ndarray]): Dictionary of stacked,
non-flattened `env_info` arrays.
* step_types (numpy.ndarray): A numpy array of `StepType with
shape (T,) containing the time step types for all
transitions in this batch.
* episode_infos (dict[str, np.ndarray]): Dictionary of stacked,
non-flattened `episode_info` arrays.
"""
episodes = []
for i, (start, stop) in enumerate(self._episode_ranges()):
episodes.append({
'episode_infos':
{k: v[i:i + 1]
for (k, v) in self.episode_infos.items()},
'observations':
self.observations[start:stop],
'next_observations':
np.concatenate((self.observations[1 + start:stop],
[self.last_observations[i]])),
'actions':
self.actions[start:stop],
'rewards':
self.rewards[start:stop],
'env_infos':
{k: v[start:stop]
for (k, v) in self.env_infos.items()},
'agent_infos':
{k: v[start:stop]
for (k, v) in self.agent_infos.items()},
'step_types':
self.step_types[start:stop]
})
return episodes
[docs] @classmethod
def from_list(cls, env_spec, paths):
"""Create a EpisodeBatch from a list of episodes.
Args:
env_spec (EnvSpec): Specification for the environment from which
this data was sampled.
paths (list[dict[str, np.ndarray or dict[str, np.ndarray]]]): Keys:
* episode_infos (dict[str, np.ndarray]): Dictionary of stacked,
non-flattened `episode_info` arrays, each of shape (S^*).
* observations (np.ndarray): Non-flattened array of
observations. Typically has shape (T, S^*) (the unflattened
state space of the current environment). observations[i]
was used by the agent to choose actions[i]. observations
may instead have shape (T + 1, S^*).
* next_observations (np.ndarray): Non-flattened array of
observations. Has shape (T, S^*). next_observations[i] was
observed by the agent after taking actions[i]. Optional.
Note that to ensure all information from the environment
was preserved, observations[i] must have shape (T + 1,
S^*), or this key must be set. However, this method is
lenient and will "duplicate" the last observation if the
original last observation has been lost.
* actions (np.ndarray): Non-flattened array of actions. Must
have shape (T, S^*) (the unflattened action space of the
current environment).
* rewards (np.ndarray): Array of rewards of shape (T,) (1D
array of length timesteps).
* agent_infos (dict[str, np.ndarray]): Dictionary of stacked,
non-flattened `agent_info` arrays.
* env_infos (dict[str, np.ndarray]): Dictionary of stacked,
non-flattened `env_info` arrays.
* step_types (numpy.ndarray): A numpy array of `StepType with
shape (T,) containing the time step types for all
transitions in this batch.
"""
lengths = np.asarray([len(p['rewards']) for p in paths])
if all(
len(path['observations']) == length + 1
for (path, length) in zip(paths, lengths)):
last_observations = np.asarray(
[p['observations'][-1] for p in paths])
observations = np.concatenate(
[p['observations'][:-1] for p in paths])
else:
# The number of observations and timesteps must match.
observations = np.concatenate([p['observations'] for p in paths])
if paths[0].get('next_observations') is not None:
last_observations = np.asarray(
[p['next_observations'][-1] for p in paths])
else:
last_observations = np.asarray(
[p['observations'][-1] for p in paths])
stacked_paths = concat_tensor_dict_list(paths)
episode_infos = stack_tensor_dict_list(
[path['episode_infos'] for path in paths])
# Temporary solution. This logic is not needed if algorithms process
# step_types instead of dones directly.
if 'dones' in stacked_paths and 'step_types' not in stacked_paths:
step_types = np.array([
StepType.TERMINAL if done else StepType.MID
for done in stacked_paths['dones']
],
dtype=StepType)
stacked_paths['step_types'] = step_types
del stacked_paths['dones']
return cls(env_spec=env_spec,
episode_infos=episode_infos,
observations=observations,
last_observations=last_observations,
actions=stacked_paths['actions'],
rewards=stacked_paths['rewards'],
env_infos=stacked_paths['env_infos'],
agent_infos=stacked_paths['agent_infos'],
step_types=stacked_paths['step_types'],
lengths=lengths)
@property
def next_observations(self):
r"""Get the observations seen after actions are performed.
In an :class:`~EpisodeBatch`, next_observations don't need to be stored
explicitly, since the next observation is already stored in
the batch.
Returns:
np.ndarray: The "next_observations" with shape
:math:`(N \bullet [T], O^*)`
"""
return np.concatenate(
tuple([
np.concatenate((eps.observations[1:], eps.last_observations))
for eps in self.split()
]))
@property
def episode_infos(self):
r"""Get the episode_infos.
In an :class:`~EpisodeBatch`, episode_infos only need to be stored once
per episode. However, the episode_infos field of
:class:`~TimeStepBatch` has shape :math:`(N \bullet [T])`. This method
expands episode_infos_by_episode (which have shape :math:`(N)`) to
:math:`(N \bullet [T])`.
Returns:
dict[str, np.ndarray]: The episode_infos each of length :math:`(N
\bullet [T])`.
"""
return {
key: np.concatenate([
np.repeat([v], length, axis=0)
for (v, length) in zip(val, self.lengths)
])
for (key, val) in self.episode_infos_by_episode.items()
}
@property
def padded_observations(self):
"""Padded observations.
Returns:
np.ndarray: Padded observations with shape of
:math:`(N, max_episode_length, O^*)`.
"""
return pad_batch_array(self.observations, self.lengths,
self.env_spec.max_episode_length)
@property
def padded_actions(self):
"""Padded actions.
Returns:
np.ndarray: Padded actions with shape of
:math:`(N, max_episode_length, A^*)`.
"""
return pad_batch_array(self.actions, self.lengths,
self.env_spec.max_episode_length)
@property
def observations_list(self):
"""Split observations into a list.
Returns:
list[np.ndarray]: Splitted list.
"""
obs_list = []
for start, stop in self._episode_ranges():
obs_list.append(self.observations[start:stop])
return obs_list
@property
def actions_list(self):
"""Split actions into a list.
Returns:
list[np.ndarray]: Splitted list.
"""
acts_list = []
for start, stop in self._episode_ranges():
acts_list.append(self.actions[start:stop])
return acts_list
@property
def padded_rewards(self):
"""Padded rewards.
Returns:
np.ndarray: Padded rewards with shape of
:math:`(N, max_episode_length)`.
"""
return pad_batch_array(self.rewards, self.lengths,
self.env_spec.max_episode_length)
@property
def valids(self):
"""An array indicating valid steps in a padded tensor.
Returns:
np.ndarray: the shape is :math:`(N, max_episode_length)`.
"""
return pad_batch_array(np.ones_like(self.rewards), self.lengths,
self.env_spec.max_episode_length)
@property
def padded_next_observations(self):
"""Padded next_observations array.
Returns:
np.ndarray: Array of shape :math:`(N, max_episode_length, O^*)`
"""
return pad_batch_array(self.next_observations, self.lengths,
self.env_spec.max_episode_length)
@property
def padded_step_types(self):
"""Padded step_type array.
Returns:
np.ndarray: Array of shape :math:`(N, max_episode_length)`
"""
return pad_batch_array(self.step_types, self.lengths,
self.env_spec.max_episode_length)
@property
def padded_agent_infos(self):
"""Padded agent infos.
Returns:
dict[str, np.ndarray]: Padded agent infos. Each value must have
shape with :math:`(N, max_episode_length)` or
:math:`(N, max_episode_length, S^*)`.
"""
return {
k: pad_batch_array(arr, self.lengths,
self.env_spec.max_episode_length)
for (k, arr) in self.agent_infos.items()
}
@property
def padded_env_infos(self):
"""Padded env infos.
Returns:
dict[str, np.ndarray]: Padded env infos. Each value must have
shape with :math:`(N, max_episode_length)` or
:math:`(N, max_episode_length, S^*)`.
"""
return {
k: pad_batch_array(arr, self.lengths,
self.env_spec.max_episode_length)
for (k, arr) in self.env_infos.items()
}
def _space_soft_contains(space, element):
"""Check that a space has the same dimensionality as an element.
If the space's dimensionality is not available, check that the space
contains the element.
Args:
space (akro.Space or gym.Space): Space to check
element (object): Element to check in space.
Returns:
bool: True iff the element was "matched" the space.
"""
if space.contains(element):
return True
elif hasattr(space, 'flat_dim'):
return space.flat_dim == np.prod(element.shape)
else:
return False
[docs]def check_timestep_batch(batch, array_type, ignored_fields=()):
"""Check a TimeStepBatch of any array type that has .shape.
Args:
batch (TimeStepBatch): Batch of timesteps.
array_type (type): Array type.
ignored_fields (set[str]): Set of fields to ignore checking on.
Raises:
ValueError: If an invariant of TimeStepBatch is broken.
"""
# pylint:disable=too-many-branches
fields = {
field: getattr(batch, field)
for field in [
'env_spec', 'rewards', 'rewards', 'observations', 'actions',
'next_observations', 'step_types', 'agent_infos', 'episode_infos',
'env_infos'
] if field not in ignored_fields
}
env_spec = fields.get('env_spec', None)
inferred_batch_size = None
inferred_batch_size_field = None
for field, value in fields.items():
if field in [
'observations', 'actions', 'rewards', 'next_observations',
'step_types'
]:
if not isinstance(value, array_type):
raise ValueError(f'{field} is not of type {array_type!r}')
if hasattr(value, 'shape'):
if inferred_batch_size is None:
inferred_batch_size = value.shape[0]
inferred_batch_size_field = field
elif value.shape[0] != inferred_batch_size:
raise ValueError(
f'{field} has batch size {value.shape[0]}, but '
f'must have batch size {inferred_batch_size} '
f'to match {inferred_batch_size_field}')
if env_spec and field in ['observations', 'next_observations']:
if not _space_soft_contains(env_spec.observation_space,
value[0]):
raise ValueError(
f'Each {field[:-1]} has shape {value[0].shape} '
f'but must match the observation_space '
f'{env_spec.observation_space}')
if (isinstance(value[0], np.ndarray)
and not env_spec.observation_space.contains(value[0])):
warnings.warn(
f'Observation {value[0]!r} is outside '
f'observation_space {env_spec.observation_space}')
if env_spec and field == 'actions':
if not _space_soft_contains(env_spec.action_space, value[0]):
raise ValueError(
f'Each {field[:-1]} has shape {value[0].shape} '
f'but must match the action_space '
f'{env_spec.action_space}')
if field in ['rewards', 'step_types']:
if value.shape != (inferred_batch_size, ):
raise ValueError(f'{field} has shape {value.shape} '
f'but must have batch size '
f'{inferred_batch_size} to match '
f'{inferred_batch_size_field}')
if field in ['agent_infos', 'env_infos', 'episode_infos']:
for key, val in value.items():
if not isinstance(val, (array_type, dict)):
raise ValueError(
f'Entry {key!r} in {field} is of type {type(val)}'
f'but must be {array_type!r} or dict')
if hasattr(val, 'shape'):
if val.shape[0] != inferred_batch_size:
raise ValueError(
f'Entry {key!r} in {field} has batch size '
f'{val.shape[0]} but must have batch size '
f'{inferred_batch_size} to match '
f'{inferred_batch_size_field}')
if (field == 'step_types' and isinstance(value, np.ndarray)
and # Only numpy arrays support custom dtypes.
value.dtype != StepType):
raise ValueError(
f'step_types has dtype {value.dtype} but must have '
f'dtype StepType')