Source code for garage.torch._functions

"""Utility functions for PyTorch algorithms.

A collection of common functions that are used by Pytorch algos.

This collection of functions can be used to manage the following:
    - Pytorch GPU usage
        - setting the default Pytorch GPU
        - converting Tensors to GPU Tensors
        - Converting Tensors into `numpy.ndarray` format and vice versa
    - Updating model parameters
"""
import torch
import torch.nn.functional as F

_USE_GPU = False
_DEVICE = None
_GPU_ID = 0


[docs]def compute_advantages(discount, gae_lambda, max_path_length, baselines, rewards): """Calculate advantages. Advantages are a discounted cumulative sum. Calculate advantages using a baseline according to Generalized Advantage Estimation (GAE) The discounted cumulative sum can be computed using conv2d with filter. filter: [1, (discount * gae_lambda), (discount * gae_lambda) ^ 2, ...] where the length is same with max_path_length. baselines and rewards are also has same shape. baselines: [ [b_11, b_12, b_13, ... b_1n], [b_21, b_22, b_23, ... b_2n], ... [b_m1, b_m2, b_m3, ... b_mn] ] rewards: [ [r_11, r_12, r_13, ... r_1n], [r_21, r_22, r_23, ... r_2n], ... [r_m1, r_m2, r_m3, ... r_mn] ] Args: discount (float): RL discount factor (i.e. gamma). gae_lambda (float): Lambda, as used for Generalized Advantage Estimation (GAE). max_path_length (int): Maximum length of a single rollout. baselines (torch.Tensor): A 2D vector of value function estimates with shape (N, T), where N is the batch dimension (number of episodes) and T is the maximum path length experienced by the agent. If an episode terminates in fewer than T time steps, the remaining elements in that episode should be set to 0. rewards (torch.Tensor): A 2D vector of per-step rewards with shape (N, T), where N is the batch dimension (number of episodes) and T is the maximum path length experienced by the agent. If an episode terminates in fewer than T time steps, the remaining elements in that episode should be set to 0. Returns: torch.Tensor: A 2D vector of calculated advantage values with shape (N, T), where N is the batch dimension (number of episodes) and T is the maximum path length experienced by the agent. If an episode terminates in fewer than T time steps, the remaining values in that episode should be set to 0. """ adv_filter = torch.full((1, 1, 1, max_path_length - 1), discount * gae_lambda, dtype=torch.float) adv_filter = torch.cumprod(F.pad(adv_filter, (1, 0), value=1), dim=-1) deltas = (rewards + discount * F.pad(baselines, (0, 1))[:, 1:] - baselines) deltas = F.pad(deltas, (0, max_path_length - 1)).unsqueeze(0).unsqueeze(0) advantages = F.conv2d(deltas, adv_filter, stride=1).reshape(rewards.shape) return advantages
[docs]def pad_to_last(nums, total_length, axis=-1, val=0): """Pad val to last in nums in given axis. length of the result in given axis should be total_length. Raises: IndexError: If the input axis value is out of range of the nums array Args: nums (numpy.ndarray): The array to pad. total_length (int): The final width of the Array. axis (int): Axis along which a sum is performed. val (int): The value to set the padded value. Returns: torch.Tensor: Padded array """ tensor = torch.Tensor(nums) axis = (axis + len(tensor.shape)) if axis < 0 else axis if len(tensor.shape) <= axis: raise IndexError('axis {} is out of range {}'.format( axis, tensor.shape)) padding_config = [0, 0] * len(tensor.shape) padding_idx = abs(axis - len(tensor.shape)) * 2 - 1 padding_config[padding_idx] = max(total_length - tensor.shape[axis], val) return F.pad(tensor, padding_config)
[docs]def filter_valids(tensor, valids): """Filter out tensor using valids (last index of valid tensors). valids contains last indices of each rows. Args: tensor (torch.Tensor): The tensor to filter valids (list[int]): Array of length of the valid values Returns: torch.Tensor: Filtered Tensor """ return [tensor[i][:valid] for i, valid in enumerate(valids)]
[docs]def dict_np_to_torch(array_dict): """Convert a dict whose values are numpy arrays to PyTorch tensors. Modifies array_dict in place. Args: array_dict (dict): Dictionary of data in numpy arrays Returns: dict: Dictionary of data in PyTorch tensors """ for key, value in array_dict.items(): array_dict[key] = torch.from_numpy(value).float().to(global_device()) return array_dict
[docs]def torch_to_np(tensors): """Convert PyTorch tensors to numpy arrays. Args: tensors (tuple): Tuple of data in PyTorch tensors. Returns: tuple[numpy.ndarray]: Tuple of data in numpy arrays. Note: This method is deprecated and now replaced by `garage.torch._functions.to_numpy`. """ value_out = tuple(v.numpy() for v in tensors) return value_out
[docs]def flatten_batch(tensor): """Flatten a batch of observations. Reshape a tensor of size (X, Y, Z) into (X*Y, Z) Args: tensor (torch.Tensor): Tensor to flatten. Returns: torch.Tensor: Flattened tensor. """ return tensor.reshape((-1, ) + tensor.shape[2:])
[docs]def update_module_params(module, new_params): # noqa: D202 """Load parameters to a module. This function acts like `torch.nn.Module._load_from_state_dict()`, but it replaces the tensors in module with those in new_params, while `_load_from_state_dict()` loads only the value. Use this function so that the `grad` and `grad_fn` of `new_params` can be restored Args: module (torch.nn.Module): A torch module. new_params (dict): A dict of torch tensor used as the new parameters of this module. This parameters dict should be generated by `torch.nn.Module.named_parameters()` """ # pylint: disable=protected-access def update(m, name, param): del m._parameters[name] # noqa: E501 setattr(m, name, param) m._parameters[name] = param # noqa: E501 named_modules = dict(module.named_modules()) for name, new_param in new_params.items(): if '.' in name: module_name, param_name = tuple(name.rsplit('.', 1)) if module_name in named_modules: update(named_modules[module_name], param_name, new_param) else: update(module, name, new_param)
[docs]def set_gpu_mode(mode, gpu_id=0): """Set GPU mode and device ID. Args: mode (bool): Whether or not to use GPU gpu_id (int): GPU ID """ # pylint: disable=global-statement global _GPU_ID global _USE_GPU global _DEVICE _GPU_ID = gpu_id _USE_GPU = mode _DEVICE = torch.device(('cuda:' + str(_GPU_ID)) if _USE_GPU else 'cpu')
[docs]def global_device(): """Returns the global device that torch.Tensors should be placed on. Note: The global device is set by using the function `garage.torch._functions.set_gpu_mode.` If this functions is never called `garage.torch._functions.device()` returns None. Returns: `torch.Device`: The global device that newly created torch.Tensors should be placed on. """ # pylint: disable=global-statement global _DEVICE return _DEVICE
[docs]def product_of_gaussians(mus, sigmas_squared): """Compute mu, sigma of product of gaussians. Args: mus (torch.Tensor): Means, with shape :math:`(N, M)`. M is the number of mean values. sigmas_squared (torch.Tensor): Variances, with shape :math:`(N, V)`. V is the number of variance values. Returns: torch.Tensor: Mu of product of gaussians, with shape :math:`(N, 1)`. torch.Tensor: Sigma of product of gaussians, with shape :math:`(N, 1)`. """ sigmas_squared = torch.clamp(sigmas_squared, min=1e-7) sigma_squared = 1. / torch.sum(torch.reciprocal(sigmas_squared), dim=0) mu = sigma_squared * torch.sum(mus / sigmas_squared, dim=0) return mu, sigma_squared