Source code for garage._dtypes

"""Data types for agent-based learning."""
import collections
import enum

import akro
import numpy as np

from import (concat_tensor_dict_list, slice_nested_dict,

# pylint: disable=too-many-lines

class EpisodeBatch(
        collections.namedtuple('EpisodeBatch', [
    # pylint: disable=missing-return-doc, missing-return-type-doc, missing-param-doc, missing-type-doc  # noqa: E501
    r"""A tuple representing a batch of whole episodes.

    Data type for on-policy algorithms.

    A :class:`~EpisodeBatch` represents a batch of whole episodes, produced
    when one or more agents interacts with one or more environments.

    | Symbol                | Description                                     |
    | :math:`N`             | Episode batch dimension                         |
    | :math:`[T]`           | Variable-length time dimension of each          |
    |                       | episode                                         |
    | :math:`S^*`           | Single-step shape of a time-series tensor       |
    | :math:`N \bullet [T]` | A dimension computed by flattening a            |
    |                       | variable-length time dimension :math:`[T]` into |
    |                       | a single batch dimension with length            |
    |                       | :math:`sum_{i \in N} [T]_i`                     |

        env_spec (EnvSpec): Specification for the environment from
            which this data was sampled.
        episode_infos (dict[str, np.ndarray]): A dict of numpy arrays
            containing the episode-level information of each episode. Each
            value of this dict should be a numpy array of shape :math:`(N,
            S^*)`. For example, in goal-conditioned reinforcement learning this
            could contain the goal state for each episode.
        observations (numpy.ndarray): A numpy array of shape
            :math:`(N \bullet [T], O^*)` containing the (possibly
            multi-dimensional) observations for all time steps in this batch.
            These must conform to :obj:`EnvStep.observation_space`.
        last_observations (numpy.ndarray): A numpy array of shape
            :math:`(N, O^*)` containing the last observation of each episode.
            This is necessary since there are one more observations than
            actions every episode.
        actions (numpy.ndarray): A  numpy array of shape
            :math:`(N \bullet [T], A^*)` containing the (possibly
            multi-dimensional) actions for all time steps in this batch. These
            must conform to :obj:`EnvStep.action_space`.
        rewards (numpy.ndarray): A numpy array of shape
            :math:`(N \bullet [T])` containing the rewards for all time steps
            in this batch.
        env_infos (dict[str, np.ndarray]): A dict of numpy arrays arbitrary
            environment state information. Each value of this dict should be
            a numpy array of shape :math:`(N \bullet [T])` or :math:`(N \bullet
            [T], S^*)`.
        agent_infos (dict[str, np.ndarray]): A dict of numpy arrays arbitrary
            agent state information. Each value of this dict should be a numpy
            array of shape :math:`(N \bullet [T])` or :math:`(N \bullet [T],
            S^*)`.  For example, this may contain the hidden states from an RNN
        step_types (numpy.ndarray): A numpy array of `StepType with shape
            :math:`(N,)` containing the time step types for all transitions in
            this batch.
        lengths (numpy.ndarray): An integer numpy array of shape :math:`(N,)`
            containing the length of each episode in this batch. This may be
            used to reconstruct the individual episodes.

        ValueError: If any of the above attributes do not conform to their
            prescribed types and shapes.

    __slots__ = ()

    def __new__(cls, env_spec, episode_infos, observations, last_observations,
                actions, rewards, env_infos, agent_infos, step_types,
                lengths):  # noqa: D102
        # pylint: disable=too-many-branches

        first_observation = observations[0]
        first_action = actions[0]
        inferred_batch_size = lengths.sum()

        # lengths
        if len(lengths.shape) != 1:
            raise ValueError(
                'Lengths tensor must be a tensor of shape (N,), but got a '
                'tensor of shape {} instead'.format(lengths.shape))

        if not (lengths.dtype.kind == 'u' or lengths.dtype.kind == 'i'):
            raise ValueError(
                'Lengths tensor must have an integer dtype, but got dtype {} '

        # observations
        if not env_spec.observation_space.contains(first_observation):
            # Discrete actions can be either in the space normally, or one-hot
            # encoded.
            if isinstance(env_spec.observation_space,
                          (akro.Box, akro.Discrete, akro.Dict)):
                if env_spec.observation_space.flat_dim !=
                    raise ValueError('observations should have the same '
                                     'dimensionality as the observation_space '
                                     '({}), but got data with shape {} '
                raise ValueError(
                    'observations must conform to observation_space {}, but '
                    'got data with shape {} instead.'.format(
                        env_spec.observation_space, first_observation))

        if observations.shape[0] != inferred_batch_size:
            raise ValueError(
                'Expected batch dimension of observations to be length {}, '
                'but got length {} instead.'.format(inferred_batch_size,

        # observations
        if not env_spec.observation_space.contains(last_observations[0]):
            # Discrete actions can be either in the space normally, or one-hot
            # encoded.
            if isinstance(env_spec.observation_space,
                          (akro.Box, akro.Discrete, akro.Dict)):
                if env_spec.observation_space.flat_dim !=
                    raise ValueError('last_observations should have the same '
                                     'dimensionality as the observation_space '
                                     '({}), but got data with shape {} '
                raise ValueError(
                    'last_observations must conform to observation_space {}, '
                    'but got data with shape {} instead.'.format(
                        env_spec.observation_space, last_observations[0]))

        if last_observations.shape[0] != len(lengths):
            raise ValueError(
                'Expected batch dimension of last_observations to be length '
                '{}, but got length {} instead.'.format(
                    len(lengths), last_observations.shape[0]))

        # actions
        if not env_spec.action_space.contains(first_action):
            # Discrete actions can be either in the space normally, or one-hot
            # encoded.
            if isinstance(env_spec.action_space,
                          (akro.Box, akro.Discrete, akro.Dict)):
                if env_spec.action_space.flat_dim !=
                    raise ValueError('actions should have the same '
                                     'dimensionality as the action_space '
                                     '({}), but got data with shape {} '
                raise ValueError(
                    'actions must conform to action_space {}, but got data '
                    'with shape {} instead.'.format(env_spec.action_space,

        if actions.shape[0] != inferred_batch_size:
            raise ValueError(
                'Expected batch dimension of actions to be length {}, but got '
                'length {} instead.'.format(inferred_batch_size,

        # rewards
        if rewards.shape != (inferred_batch_size, ):
            raise ValueError(
                'Rewards tensor must have shape {}, but got shape {} '
                'instead.'.format(inferred_batch_size, rewards.shape))

        # env_infos
        for key, val in env_infos.items():
            if not isinstance(val, (dict, np.ndarray)):
                raise ValueError(
                    'Each entry in env_infos must be a numpy array or '
                    'dictionary, but got key {} with value type {} instead.'.
                    format(key, type(val)))

            if (isinstance(val, np.ndarray)
                    and val.shape[0] != inferred_batch_size):
                raise ValueError(
                    'Each entry in env_infos must have a batch dimension of '
                    'length {}, but got key {} with batch size {} instead.'.
                    format(inferred_batch_size, key, val.shape[0]))

        # agent_infos
        for key, val in agent_infos.items():
            if not isinstance(val, (dict, np.ndarray)):
                raise ValueError(
                    'Each entry in agent_infos must be a numpy array or '
                    'dictionary, but got key {} with value type {} instead.'
                    'instead'.format(key, type(val)))

            if (isinstance(val, np.ndarray)
                    and val.shape[0] != inferred_batch_size):
                raise ValueError(
                    'Each entry in agent_infos must have a batch dimension of '
                    'length {}, but got key {} with batch size {} instead.'.
                    format(inferred_batch_size, key, val.shape[0]))

        # step_types
        if step_types.shape != (inferred_batch_size, ):
            raise ValueError(
                'step_types tensor must have shape {}, but got shape {} '
                'instead.'.format(inferred_batch_size, step_types.shape))

        if step_types.dtype != StepType:
            raise ValueError(
                'step_types tensor must be dtype `StepType`, but got tensor '
                'of dtype {} instead.'.format(step_types.dtype))

        # episode_infos
        for key, val in episode_infos.items():
            if not isinstance(val, (dict, np.ndarray)):
                raise ValueError(
                    'Each entry in episode_infos must be a numpy array,'
                    'but got key {} with value type {} instead.'.format(
                        key, type(val)))
            if (isinstance(val, np.ndarray) and val.shape[0] != len(lengths)):
                raise ValueError(
                    'Each entry in episode_infos must have a batch dimension '
                    'of length {}, but got key {} with batch size {} instead.'.
                    format(len(lengths), key, val.shape[0]))

        return super().__new__(EpisodeBatch, env_spec, episode_infos,
                               observations, last_observations, actions,
                               rewards, env_infos, agent_infos, step_types,

[docs] @classmethod def concatenate(cls, *batches): """Create a EpisodeBatch by concatenating EpisodeBatches. Args: batches (list[EpisodeBatch]): Batches to concatenate. Returns: EpisodeBatch: The concatenation of the batches. """ if __debug__: for b in batches: assert (set(b.env_infos.keys()) == set( batches[0].env_infos.keys())) assert (set(b.agent_infos.keys()) == set( batches[0].agent_infos.keys())) episode_infos = { k: np.concatenate([b.episode_infos[k] for b in batches]) for k in batches[0].episode_infos.keys() } env_infos = { k: np.concatenate([b.env_infos[k] for b in batches]) for k in batches[0].env_infos.keys() } agent_infos = { k: np.concatenate([b.agent_infos[k] for b in batches]) for k in batches[0].agent_infos.keys() } episode_infos = { k: np.concatenate([b.episode_infos[k] for b in batches]) for k in batches[0].episode_infos.keys() } return cls( episode_infos=episode_infos, env_spec=batches[0].env_spec, observations=np.concatenate( [batch.observations for batch in batches]), last_observations=np.concatenate( [batch.last_observations for batch in batches]), actions=np.concatenate([batch.actions for batch in batches]), rewards=np.concatenate([batch.rewards for batch in batches]), env_infos=env_infos, agent_infos=agent_infos, step_types=np.concatenate([batch.step_types for batch in batches]), lengths=np.concatenate([batch.lengths for batch in batches]))
[docs] def split(self): """Split an EpisodeBatch into a list of EpisodeBatches. The opposite of concatenate. Returns: list[EpisodeBatch]: A list of EpisodeBatches, with one episode per batch. """ episodes = [] start = 0 for i, length in enumerate(self.lengths): stop = start + length eps = EpisodeBatch( env_spec=self.env_spec, episode_infos=slice_nested_dict(self.episode_infos, i, i + 1), observations=self.observations[start:stop], last_observations=np.asarray([self.last_observations[i]]), actions=self.actions[start:stop], rewards=self.rewards[start:stop], env_infos=slice_nested_dict(self.env_infos, start, stop), agent_infos=slice_nested_dict(self.agent_infos, start, stop), step_types=self.step_types[start:stop], lengths=np.asarray([length])) episodes.append(eps) start = stop return episodes
[docs] def to_list(self): """Convert the batch into a list of dictionaries. Returns: list[dict[str, np.ndarray or dict[str, np.ndarray]]]: Keys: * observations (np.ndarray): Non-flattened array of observations. Has shape (T, S^*) (the unflattened state space of the current environment). observations[i] was used by the agent to choose actions[i]. * next_observations (np.ndarray): Non-flattened array of observations. Has shape (T, S^*). next_observations[i] was observed by the agent after taking actions[i]. * actions (np.ndarray): Non-flattened array of actions. Should have shape (T, S^*) (the unflattened action space of the current environment). * rewards (np.ndarray): Array of rewards of shape (T,) (1D array of length timesteps). * agent_infos (dict[str, np.ndarray]): Dictionary of stacked, non-flattened `agent_info` arrays. * env_infos (dict[str, np.ndarray]): Dictionary of stacked, non-flattened `env_info` arrays. * step_types (numpy.ndarray): A numpy array of `StepType with shape (T,) containing the time step types for all transitions in this batch. * episode_infos (dict[str, np.ndarray]): Dictionary of stacked, non-flattened `episode_info` arrays. """ start = 0 episodes = [] for i, length in enumerate(self.lengths): stop = start + length episodes.append({ 'episode_infos': {k: v[i:i + 1] for (k, v) in self.episode_infos.items()}, 'observations': self.observations[start:stop], 'next_observations': np.concatenate((self.observations[1 + start:stop], [self.last_observations[i]])), 'actions': self.actions[start:stop], 'rewards': self.rewards[start:stop], 'env_infos': {k: v[start:stop] for (k, v) in self.env_infos.items()}, 'agent_infos': {k: v[start:stop] for (k, v) in self.agent_infos.items()}, 'step_types': self.step_types[start:stop] }) start = stop return episodes
[docs] @classmethod def from_list(cls, env_spec, paths): """Create a EpisodeBatch from a list of episodes. Args: env_spec (EnvSpec): Specification for the environment from which this data was sampled. paths (list[dict[str, np.ndarray or dict[str, np.ndarray]]]): Keys: * episode_infos (dict[str, np.ndarray]): Dictionary of stacked, non-flattened `episode_info` arrays, each of shape (S^*). * observations (np.ndarray): Non-flattened array of observations. Typically has shape (T, S^*) (the unflattened state space of the current environment). observations[i] was used by the agent to choose actions[i]. observations may instead have shape (T + 1, S^*). * next_observations (np.ndarray): Non-flattened array of observations. Has shape (T, S^*). next_observations[i] was observed by the agent after taking actions[i]. Optional. Note that to ensure all information from the environment was preserved, observations[i] should have shape (T + 1, S^*), or this key should be set. However, this method is lenient and will "duplicate" the last observation if the original last observation has been lost. * actions (np.ndarray): Non-flattened array of actions. Should have shape (T, S^*) (the unflattened action space of the current environment). * rewards (np.ndarray): Array of rewards of shape (T,) (1D array of length timesteps). * agent_infos (dict[str, np.ndarray]): Dictionary of stacked, non-flattened `agent_info` arrays. * env_infos (dict[str, np.ndarray]): Dictionary of stacked, non-flattened `env_info` arrays. * step_types (numpy.ndarray): A numpy array of `StepType with shape (T,) containing the time step types for all transitions in this batch. """ lengths = np.asarray([len(p['rewards']) for p in paths]) if all( len(path['observations']) == length + 1 for (path, length) in zip(paths, lengths)): last_observations = np.asarray( [p['observations'][-1] for p in paths]) observations = np.concatenate( [p['observations'][:-1] for p in paths]) else: # The number of observations and timesteps must match. observations = np.concatenate([p['observations'] for p in paths]) if paths[0].get('next_observations') is not None: last_observations = np.asarray( [p['next_observations'][-1] for p in paths]) else: last_observations = np.asarray( [p['observations'][-1] for p in paths]) stacked_paths = concat_tensor_dict_list(paths) episode_infos = stack_tensor_dict_list( [path['episode_infos'] for path in paths]) # Temporary solution. This logic is not needed if algorithms process # step_types instead of dones directly. if 'dones' in stacked_paths and 'step_types' not in stacked_paths: step_types = np.array([ StepType.TERMINAL if done else StepType.MID for done in stacked_paths['dones'] ], dtype=StepType) stacked_paths['step_types'] = step_types del stacked_paths['dones'] return cls(env_spec=env_spec, episode_infos=episode_infos, observations=observations, last_observations=last_observations, actions=stacked_paths['actions'], rewards=stacked_paths['rewards'], env_infos=stacked_paths['env_infos'], agent_infos=stacked_paths['agent_infos'], step_types=stacked_paths['step_types'], lengths=lengths)
@property def next_observations(self): """Get the observations seen after actions are performed. Usually, in an :class:`~EpisodeBatch`, next_observations don't need to be stored explicitly, since the next observation is already stored in the batch. Returns: np.ndarray: The "next_observations". """ return np.concatenate( tuple([ np.concatenate((eps.observations[1:], eps.last_observations)) for eps in self.split() ])) @property def padded_observations(self): """Padded observations. Returns: np.ndarray: Padded observations with shape of :math:`(N, max_episode_length, O^*)`. """ return self.pad_to_last(self.observations) @property def padded_actions(self): """Padded actions. Returns: np.ndarray: Padded actions with shape of :math:`(N, max_episode_length, A^*)`. """ return self.pad_to_last(self.actions) @property def observations_list(self): """Split observations into a list. Returns: list[np.ndarray]: Splitted list. """ start = 0 obs_list = [] for length in self.lengths: stop = start + length obs_list.append(self.observations[start:stop]) start = stop return obs_list @property def actions_list(self): """Split actions into a list. Returns: list[np.ndarray]: Splitted list. """ start = 0 acts_list = [] for length in self.lengths: stop = start + length acts_list.append(self.actions[start:stop]) start = stop return acts_list @property def padded_rewards(self): """Padded rewards. Returns: np.ndarray: Padded rewards with shape of :math:`(N, max_episode_length)`. """ return self.pad_to_last(self.rewards) @property def valids(self): """An array indicating valid steps in a padded tensor. Returns: np.ndarray: the shape is :math:`(N, max_episode_length)`. """ valids = [] for length in self.lengths: ones = np.ones(length) n_zeros = max(self.env_spec.max_episode_length - length, 0) zeros = np.zeros(n_zeros) valids.append(np.concatenate((ones, zeros))) return np.asarray(valids) @property def padded_agent_infos(self): """Padded agent infos. Returns: dict[str, np.ndarray]: Padded agent infos. Each value should have shape with :math:`(N, max_episode_length)` or :math:`(N, max_episode_length, S^*)`. """ info_keys = self.agent_infos.keys() agent_infos = {k: [] for k in info_keys} for key in info_keys: agent_infos[key] = self.pad_to_last(self.agent_infos[key]) return agent_infos
[docs] def pad_to_last(self, input_array): """Pad tensors with zeros. Args: input_array (np.ndarray): Tensors to be padded. Return: numpy.ndarray: Padded tensor with shape of :math:`(N, max_episode_length)` or :math:`(N, max_episode_length, S^*)`. """ start = 0 padded_array = [] for length in self.lengths: stop = start + length pad_witdh = np.zeros((len(input_array.shape), 2), dtype=np.int32) pad_witdh[0][1] = max(self.env_spec.max_episode_length - length, 0) padded_array.append(np.pad(input_array[start:stop], pad_witdh)) start = stop return np.asarray(padded_array)
[docs]class StepType(enum.IntEnum): """Defines the status of a :class:`~TimeStep` within a sequence. Note that the last :class:`~TimeStep` in a sequence can either be :attribute:`StepType.TERMINAL` or :attribute:`StepType.TIMEOUT`. Suppose max_episode_length = 5: * A success sequence terminated at step 4 will look like: FIRST, MID, MID, TERMINAL * A success sequence terminated at step 5 will look like: FIRST, MID, MID, MID, TERMINAL * An unsuccessful sequence truncated by time limit will look like: FIRST, MID, MID, MID, TIMEOUT """ # Denotes the first :class:`~TimeStep` in a sequence. FIRST = 0 # Denotes any :class:`~TimeStep` in the middle of a sequence (i.e. not the # first or last one). MID = 1 # Denotes the last :class:`~TimeStep` in a sequence that terminates # successfully. TERMINAL = 2 # Denotes the last :class:`~TimeStep` in a sequence truncated by time # limit. TIMEOUT = 3
[docs] @classmethod def get_step_type(cls, step_cnt, max_episode_length, done): """Determines the step type based on step cnt and done signal. Args: step_cnt (int): current step cnt of the environment. max_episode_length (int): maximum episode length. done (bool): the done signal returned by Environment. Returns: StepType: the step type. Raises: ValueError: if step_cnt is < 1. In this case a environment's `reset()` is likely not called yet and the step_cnt is None. """ if max_episode_length is not None and step_cnt >= max_episode_length: return StepType.TIMEOUT elif done: return StepType.TERMINAL elif step_cnt == 1: return StepType.FIRST elif step_cnt < 1: raise ValueError('Expect step_cnt to be >= 1, but got {} ' 'instead. Did you forget to call `reset(' ')`?'.format(step_cnt)) else: return StepType.MID
[docs]class TimeStep( collections.namedtuple('TimeStep', [ 'env_spec', 'episode_info', 'observation', 'action', 'reward', 'next_observation', 'env_info', 'agent_info', 'step_type' ])): # pylint: disable=missing-return-doc, missing-return-type-doc, missing-param-doc, missing-type-doc # noqa: E501 r"""A tuple representing a single TimeStep. A :class:`~TimeStep` represents a single sample when an agent interacts with an environment. It describes as SARS (State–action–reward–state) tuple that characterizes the evolution of a MDP. Attributes: env_spec (EnvSpec): Specification for the environment from which this data was sampled. episode_info (dict[str, np.ndarray]): A dict of numpy arrays of shape :math:`(S*^,)` containing episode-level information of each episode. For example, in goal-conditioned reinforcement learning this could contain the goal state for each episode. observation (numpy.ndarray): A numpy array of shape :math:`(O^*)` containing the observation for this time step in the environment. These must conform to :obj:`EnvStep.observation_space`. The observation before applying the action. `None` if `step_type` is `StepType.FIRST`, i.e. at the start of a sequence. action (numpy.ndarray): A numpy array of shape :math:`(A^*)` containing the action for this time step. These must conform to :obj:`EnvStep.action_space`. `None` if `step_type` is `StepType.FIRST`, i.e. at the start of a sequence. reward (float): A float representing the reward for taking the action given the observation, at this time step. `None` if `step_type` is `StepType.FIRST`, i.e. at the start of a sequence. next_observation (numpy.ndarray): A numpy array of shape :math:`(O^*)` containing the observation for this time step in the environment. These must conform to :obj:`EnvStep.observation_space`. The observation after applying the action. env_info (dict): A dict arbitrary environment state information. agent_info (dict): A dict of arbitrary agent state information. For example, this may contain the hidden states from an RNN policy. step_type (StepType): a :class:`~StepType` enum value. Can be one of :attribute:`~StepType.FIRST`, :attribute:`~StepType.MID`, :attribute:`~StepType.TERMINAL`, or :attribute:`~StepType.TIMEOUT`. """ @property def first(self): """bool: Whether this step is the first of its episode.""" return self.step_type is StepType.FIRST @property def mid(self): """bool: Whether this step is in the middle of its episode.""" return self.step_type is StepType.MID @property def terminal(self): """bool: Whether this step records a termination condition.""" return self.step_type is StepType.TERMINAL @property def timeout(self): """bool: Whether this step records a timeout condition.""" return self.step_type is StepType.TIMEOUT @property def last(self): """bool: Whether this step is the last of its episode.""" return self.step_type is StepType.TERMINAL or self.step_type \ is StepType.TIMEOUT
[docs] @classmethod def from_env_step(cls, env_step, last_observation, agent_info, episode_info): """Create a TimeStep from a EnvStep. Args: env_step (EnvStep): the env step returned by the environment. last_observation (numpy.ndarray): A numpy array of shape :math:`(O^*)` containing the observation for this time step in the environment. These must conform to :attr:`EnvStep.observation_space`. The observation before applying the action. agent_info (dict): A dict of arbitrary agent state information. Returns: TimeStep: The TimeStep with all information of EnvStep plus the agent info. """ return cls(env_spec=env_step.env_spec, episode_info=episode_info, observation=last_observation, action=env_step.action, reward=env_step.reward, next_observation=env_step.observation, env_info=env_step.env_info, agent_info=agent_info, step_type=env_step.step_type)
[docs]class InOutSpec: """Describes the input and output spaces of a primitive or module. Args: input_space (akro.Space): Input space of a module. output_space (akro.Space): Output space of a module. """ def __init__(self, input_space, output_space): self._input_space = input_space self._output_space = output_space @property def input_space(self): """Get input space of the module. Returns: akro.Space: Input space of the module. """ return self._input_space @property def output_space(self): """Get output space of the module. Returns: akro.Space: Output space of the module. """ return self._output_space
[docs]class TimeStepBatch( collections.namedtuple('TimeStepBatch', [ 'env_spec', 'episode_infos', 'observations', 'actions', 'rewards', 'next_observations', 'env_infos', 'agent_infos', 'step_types' ])): # pylint: disable=missing-param-doc, missing-type-doc """A tuple representing a batch of TimeSteps. Data type for off-policy algorithms, imitation learning and batch-RL. Attributes: env_spec (EnvSpec): Specification for the environment from which this data was sampled. episode_infos (dict[str, np.ndarray]): A dict of numpy arrays containing the episode-level information of each episode. Each value of this dict should be a numpy array of shape :math:`(N, S^*)`. For example, in goal-conditioned reinforcement learning this could contain the goal state for each episode. observations (numpy.ndarray): Non-flattened array of observations. Typically has shape (batch_size, S^*) (the unflattened state space of the current environment). actions (numpy.ndarray): Non-flattened array of actions. Should have shape (batch_size, S^*) (the unflattened action space of the current environment). rewards (numpy.ndarray): Array of rewards of shape (batch_size, 1). next_observation (numpy.ndarray): Non-flattened array of next observations. Has shape (batch_size, S^*). next_observations[i] was observed by the agent after taking actions[i]. env_infos (dict): A dict arbitrary environment state information. agent_infos (dict): A dict of arbitrary agent state information. For example, this may contain the hidden states from an RNN policy. step_types (numpy.ndarray): A numpy array of `StepType with shape ( batch_size,) containing the time step types for all transitions in this batch. Raises: ValueError: If any of the above attributes do not conform to their prescribed types and shapes. """ __slots__ = () def __new__(cls, env_spec, episode_infos, observations, actions, rewards, next_observations, env_infos, agent_infos, step_types): # noqa: D102 # pylint: disable=missing-return-doc, missing-return-type-doc, # pylint: disable=too-many-branches inferred_batch_size = len(rewards) if inferred_batch_size < 1: raise ValueError( 'Expected batch dimension of rewards to be greater than 1, ' 'but got length {} instead.'.format(inferred_batch_size)) first_observation = observations[0] first_action = actions[0] # observation if not env_spec.observation_space.contains(first_observation): if isinstance(env_spec.observation_space, (akro.Box, akro.Discrete, akro.Dict)): if env_spec.observation_space.flat_dim != first_observation.shape): raise ValueError('observations should have the same ' 'dimensionality as the observation_space ' '({}), but got data with shape {} ' 'instead'.format( env_spec.observation_space.flat_dim, first_observation.shape)) else: raise ValueError( 'observations must conform to observation_space {}, ' 'but got data with shape {} instead.'.format( env_spec.observation_space, first_observation.shape)) if observations.shape[0] != inferred_batch_size: raise ValueError( 'Expected batch dimension of observations to be length {}, ' 'but got length {} instead.'.format(inferred_batch_size, observations.shape[0])) # next_observation if not env_spec.observation_space.contains(next_observations[0]): if isinstance(env_spec.observation_space, (akro.Box, akro.Discrete, akro.Dict)): if env_spec.observation_space.flat_dim != next_observations[0].shape): raise ValueError('next_observations should have the same ' 'dimensionality as the observation_space ' '({}), but got data with shape {} ' 'instead'.format( env_spec.observation_space.flat_dim, next_observations[0].shape)) else: raise ValueError( 'next_observations must conform to observation_space {}, ' 'but got data with shape {} instead.'.format( env_spec.observation_space, next_observations[0].shape[0])) if next_observations.shape[0] != inferred_batch_size: raise ValueError( 'Expected batch dimension of next_observations to be length {' '}, but got length {} instead.'.format( inferred_batch_size, next_observations[0].shape[0])) # action if not env_spec.action_space.contains(first_action): if isinstance(env_spec.action_space, (akro.Box, akro.Discrete, akro.Dict)): if env_spec.action_space.flat_dim != first_action.shape): raise ValueError('actions should have the same ' 'dimensionality as the action_space ' '({}), but got data with shape {} ' 'instead'.format( env_spec.action_space.flat_dim, first_action.shape)) else: raise ValueError('actions must conform to action_space {}, ' 'but got data with shape {} instead.'.format( env_spec.action_space, first_action.shape)) if actions.shape[0] != inferred_batch_size: raise ValueError( 'Expected batch dimension of actions to be length {}, but got ' 'length {} instead.'.format(inferred_batch_size, actions.shape[0])) # rewards if rewards.shape != (inferred_batch_size, 1): raise ValueError( 'Rewards tensor must have shape {}, but got shape {} ' 'instead.'.format((inferred_batch_size, 1), rewards.shape)) # step_types if step_types.shape[0] != inferred_batch_size: raise ValueError( 'Expected batch dimension of step_types to be length {}, ' 'but got ' 'length {} instead.'.format(inferred_batch_size, rewards.shape[0])) for step_type in step_types: if not isinstance(step_type, StepType): raise ValueError( 'Each entry in step_types must be a StepType, but got' ' value type {} instead.'.format(type(step_type))) # env_infos for key, val in env_infos.items(): if not isinstance(val, (dict, np.ndarray)): raise ValueError( 'Each entry in env_infos must be a numpy array or ' 'dictionary, but got key {} with value type {} ' 'instead.'.format(key, type(val))) if (isinstance(val, np.ndarray) and val.shape[0] != inferred_batch_size): raise ValueError( 'Each entry in env_infos must have a batch dimension ' 'of ' 'length {}, but got key {} with batch size {} instead.'. format(inferred_batch_size, key, val.shape[0])) # agent_infos for key, val in agent_infos.items(): if not isinstance(val, (dict, np.ndarray)): raise ValueError( 'Each entry in agent_infos must be a numpy array or ' 'dictionary, but got key {} with value type {} instead.' 'instead'.format(key, type(val))) if (isinstance(val, np.ndarray) and val.shape[0] != inferred_batch_size): raise ValueError( 'Each entry in agent_infos must have a batch ' 'dimension of ' 'length {}, but got key {} with batch size {} instead.'. format(inferred_batch_size, key, val.shape[0])) # episode_infos for key, val in episode_infos.items(): if not isinstance(val, (dict, np.ndarray)): raise ValueError( 'Each entry in episode_infos must be a numpy array, ' 'but got key {} with value type {} instead.'.format( key, type(val))) if (isinstance(val, np.ndarray) and val.shape[0] != inferred_batch_size): raise ValueError( 'Each entry in episode_infos must have a batch ' 'dimension of ' 'length {}, but got key {} with batch size {} instead.'. format(inferred_batch_size, key, val.shape[0])) return super().__new__(TimeStepBatch, env_spec, episode_infos, observations, actions, rewards, next_observations, env_infos, agent_infos, step_types)
[docs] @classmethod def concatenate(cls, *batches): """Concatenate two or more :class:`TimeStepBatch`s. Args: batches (list[TimeStepBatch]): Batches to concatenate. Returns: TimeStepBatch: The concatenation of the batches. Raises: ValueError: If no TimeStepBatches are provided. """ if len(batches) < 1: raise ValueError('Please provide at least one TimeStepBatch to ' 'concatenate') episode_infos = { k: np.concatenate([b.episode_infos[k] for b in batches]) for k in batches[0].episode_infos.keys() } env_infos = { k: np.concatenate([b.env_infos[k] for b in batches]) for k in batches[0].env_infos.keys() } agent_infos = { k: np.concatenate([b.agent_infos[k] for b in batches]) for k in batches[0].agent_infos.keys() } return cls( env_spec=batches[0].env_spec, episode_infos=episode_infos, observations=np.concatenate( [batch.observations for batch in batches]), actions=np.concatenate([batch.actions for batch in batches]), rewards=np.concatenate([batch.rewards for batch in batches]), next_observations=np.concatenate( [batch.next_observations for batch in batches]), env_infos=env_infos, agent_infos=agent_infos, step_types=np.concatenate([batch.step_types for batch in batches]))
[docs] def split(self): """Split a :class:`~TimeStepBatch` into a list of :class:`~TimeStepBatch`s. The opposite of concatenate. Returns: list[TimeStepBatch]: A list of :class:`TimeStepBatch`s, with one :class:`~TimeStep` per :class:`~TimeStepBatch`. """ time_steps = [] for i in range(len(self.rewards)): time_step = TimeStepBatch( episode_infos={ k: np.asarray([v[i]]) for (k, v) in self.episode_infos.items() }, env_spec=self.env_spec, observations=np.asarray([self.observations[i]]), actions=np.asarray([self.actions[i]]), rewards=np.asarray([self.rewards[i]]), next_observations=np.asarray([self.next_observations[i]]), env_infos={ k: np.asarray([v[i]]) for (k, v) in self.env_infos.items() }, agent_infos={ k: np.asarray([v[i]]) for (k, v) in self.agent_infos.items() }, step_types=np.asarray([self.step_types[i]], dtype=StepType)) time_steps.append(time_step) return time_steps
[docs] def to_time_step_list(self): """Convert the batch into a list of dictionaries. Breaks the :class:`~TimeStepBatch` into a list of single time step sample dictionaries. len(rewards) (or the number of discrete time step) dictionaries are returned Returns: list[dict[str, np.ndarray or dict[str, np.ndarray]]]: Keys: episode_infos (dict[str, np.ndarray]): A dict of numpy arrays containing the episode-level information of each episode. Each value of this dict should be a numpy array of shape :math:`(S^*,)`. For example, in goal-conditioned reinforcement learning this could contain the goal state for each episode. observations (numpy.ndarray): Non-flattened array of observations. Typically has shape (batch_size, S^*) (the unflattened state space of the current environment). actions (numpy.ndarray): Non-flattened array of actions. Should have shape (batch_size, S^*) (the unflattened action space of the current environment). rewards (numpy.ndarray): Array of rewards of shape ( batch_size,) (1D array of length batch_size). next_observation (numpy.ndarray): Non-flattened array of next observations. Has shape (batch_size, S^*). next_observations[i] was observed by the agent after taking actions[i]. env_infos (dict): A dict arbitrary environment state information. agent_infos (dict): A dict of arbitrary agent state information. For example, this may contain the hidden states from an RNN policy. step_types (numpy.ndarray): A numpy array of `StepType with shape (batch_size,) containing the time step types for all transitions in this batch. """ samples = [] for i in range(len(self.rewards)): samples.append({ 'episode_infos': { k: np.asarray([v[i]]) for (k, v) in self.episode_infos.items() }, 'observations': np.asarray([self.observations[i]]), 'actions': np.asarray([self.actions[i]]), 'rewards': np.asarray([self.rewards[i]]), 'next_observations': np.asarray([self.next_observations[i]]), 'env_infos': {k: np.asarray([v[i]]) for (k, v) in self.env_infos.items()}, 'agent_infos': {k: np.asarray([v[i]]) for (k, v) in self.agent_infos.items()}, 'step_types': np.asarray([self.step_types[i]]) }) return samples
@property def terminals(self): """Get an array of boolean indicating ternianal information. Returns: numpy.ndarray: An array of boolean of shape (batch_size, 1) indicating whether the `StepType is `TERMINAL """ return np.array([[s == StepType.TERMINAL] for s in self.step_types])
[docs] @classmethod def from_time_step_list(cls, env_spec, ts_samples): """Create a :class:`~TimeStepBatch` from a list of time step dictionaries. Args: env_spec (EnvSpec): Specification for the environment from which this data was sampled. ts_samples (list[dict[str, np.ndarray or dict[str, np.ndarray]]]): keys: * episode_infos (dict[str, np.ndarray]): A dict of numpy arrays containing the episode-level information of each episode. Each value of this dict should be a numpy array of shape :math:`(N, S^*)`. For example, in goal-conditioned reinforcement learning this could contain the goal state for each episode. * observations (numpy.ndarray): Non-flattened array of observations. Typically has shape (batch_size, S^*) (the unflattened state space of the current environment). * actions (numpy.ndarray): Non-flattened array of actions. Should have shape (batch_size, S^*) (the unflattened action space of the current environment). * rewards (numpy.ndarray): Array of rewards of shape ( batch_size,) (1D array of length batch_size). * next_observation (numpy.ndarray): Non-flattened array of next observations. Has shape (batch_size, S^*). next_observations[i] was observed by the agent after taking actions[i]. * env_infos (dict): A dict arbitrary environment state information. * agent_infos (dict): A dict of arbitrary agent state information. For example, this may contain the hidden states from an RNN policy. * step_types (numpy.ndarray): A numpy array of `StepType with shape (batch_size,) containing the time step types for all transitions in this batch. Returns: TimeStepBatch: The concatenation of samples. Raises: ValueError: If no dicts are provided. """ if len(ts_samples) < 1: raise ValueError('Please provide at least one dict') ts_batches = [ TimeStepBatch(episode_infos=sample['episode_infos'], env_spec=env_spec, observations=sample['observations'], actions=sample['actions'], rewards=sample['rewards'], next_observations=sample['next_observations'], env_infos=sample['env_infos'], agent_infos=sample['agent_infos'], step_types=sample['step_types']) for sample in ts_samples ] return TimeStepBatch.concatenate(*ts_batches)
[docs] @classmethod def from_episode_batch(cls, batch): """Construct a :class:`~TimeStepBatch` from an :class:`~EpisodeBatch`. Args: batch (EpisodeBatch): Episode batch to convert. Returns: TimeStepBatch: The converted batch. """ episode_infos = dict() for i in range(len(batch.lengths)): for k, v in batch.episode_infos.items(): for _ in range(batch.lengths[i]): episode_infos.setdefault(k, []).append(v[i]) for k, v in batch.episode_infos.items(): episode_infos[k] = np.asarray(episode_infos[k]) next_observations = np.concatenate( tuple([ np.concatenate((eps.observations[1:], eps.last_observations)) for eps in batch.split() ])) return cls(episode_infos=episode_infos, env_spec=batch.env_spec, observations=batch.observations, actions=batch.actions, rewards=batch.rewards.reshape(-1, 1), next_observations=next_observations, env_infos=batch.env_infos, agent_infos=batch.agent_infos, step_types=batch.step_types)