Source code for garage.np._functions

"""Utility functions for NumPy-based Reinforcement learning algorithms."""
import numpy as np

from garage._dtypes import TrajectoryBatch
from garage.misc import tensor_utils
from garage.sampler.utils import rollout


[docs]def samples_to_tensors(paths): """Return processed sample data based on the collected paths. Args: paths (list[dict]): A list of collected paths. Returns: dict: Processed sample data, with keys * undiscounted_returns (list[float]) * success_history (list[float]) * complete (list[bool]) """ success_history = [ path['success_count'] / path['running_length'] for path in paths ] undiscounted_returns = [path['undiscounted_return'] for path in paths] # check if the last path is complete complete = [path['dones'][-1] for path in paths] samples_data = dict(undiscounted_returns=undiscounted_returns, success_history=success_history, complete=complete) return samples_data
[docs]def obtain_evaluation_samples(policy, env, max_path_length=1000, num_trajs=100): """Sample the policy for num_trajs trajectories and return average values. Args: policy (garage.Policy): Policy to use as the actor when gathering samples. env (garage.envs.GarageEnv): The environement used to obtain trajectories. max_path_length (int): Maximum path length. The episode will terminate when length of trajectory reaches max_path_length. num_trajs (int): Number of trajectories. Returns: TrajectoryBatch: Evaluation trajectories, representing the best current performance of the algorithm. """ paths = [] # Use a finite length rollout for evaluation. for _ in range(num_trajs): path = rollout(env, policy, max_path_length=max_path_length, deterministic=True) paths.append(path) return TrajectoryBatch.from_trajectory_list(env.spec, paths)
[docs]def paths_to_tensors(paths, max_path_length, baseline_predictions, discount): """Return processed sample data based on the collected paths. Args: paths (list[dict]): A list of collected paths. max_path_length (int): Maximum length of a single rollout. baseline_predictions(numpy.ndarray): : Predicted value of GAE (Generalized Advantage Estimation) Baseline. discount (float): Environment reward discount. Returns: dict: Processed sample data, with key * observations (numpy.ndarray): Padded array of the observations of the environment * actions (numpy.ndarray): Padded array of the actions fed to the the environment * rewards (numpy.ndarray): Padded array of the acquired rewards * agent_infos (dict): a dictionary of {stacked tensors or dictionary of stacked tensors} * env_infos (dict): a dictionary of {stacked tensors or dictionary of stacked tensors} * rewards (numpy.ndarray): Padded array of the validity information """ baselines = [] returns = [] for idx, path in enumerate(paths): # baselines path['baselines'] = baseline_predictions[idx] baselines.append(path['baselines']) # returns path['returns'] = tensor_utils.discount_cumsum(path['rewards'], discount) returns.append(path['returns']) obs = [path['observations'] for path in paths] obs = tensor_utils.pad_tensor_n(obs, max_path_length) actions = [path['actions'] for path in paths] actions = tensor_utils.pad_tensor_n(actions, max_path_length) rewards = [path['rewards'] for path in paths] rewards = tensor_utils.pad_tensor_n(rewards, max_path_length) agent_infos = [path['agent_infos'] for path in paths] agent_infos = tensor_utils.stack_tensor_dict_list([ tensor_utils.pad_tensor_dict(p, max_path_length) for p in agent_infos ]) env_infos = [path['env_infos'] for path in paths] env_infos = tensor_utils.stack_tensor_dict_list( [tensor_utils.pad_tensor_dict(p, max_path_length) for p in env_infos]) valids = [np.ones_like(path['returns']) for path in paths] valids = tensor_utils.pad_tensor_n(valids, max_path_length) samples_data = dict(observations=obs, actions=actions, rewards=rewards, agent_infos=agent_infos, env_infos=env_infos, valids=valids) return samples_data