Source code for garage.misc.tensor_utils

"""Utiliy functions for tensors."""
import gym.spaces
import numpy as np
import scipy.signal


[docs]def discount_cumsum(x, discount): """Discounted cumulative sum. See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering # noqa: E501 Here, we have y[t] - discount*y[t+1] = x[t] or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t] Args: x (np.ndarrary): Input. discount (float): Discount factor. Returns: float: Discounted cumulative sum. """ return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
[docs]def explained_variance_1d(ypred, y): """Explained variation for 1D inputs. It is the proportion of the variance in one variable that is explained or predicted from another variable. Args: ypred (np.ndarray): Sample data from the first variable. y (np.ndarray): Sample data from the second variable. Returns: float: The explained variance. """ assert y.ndim == 1 and ypred.ndim == 1 vary = np.var(y) if np.isclose(vary, 0): if np.var(ypred) > 0: return 0 return 1 return 1 - np.var(y - ypred) / (vary + 1e-8)
[docs]def flatten_tensors(tensors): """Flatten a list of tensors. Args: tensors (list[numpy.ndarray]): List of tensors to be flattened. Returns: numpy.ndarray: Flattened tensors. """ if tensors: return np.concatenate([np.reshape(x, [-1]) for x in tensors]) return np.asarray([])
[docs]def unflatten_tensors(flattened, tensor_shapes): """Unflatten a flattened tensors into a list of tensors. Args: flattened (numpy.ndarray): Flattened tensors. tensor_shapes (tuple): Tensor shapes. Returns: list[numpy.ndarray]: Unflattened list of tensors. """ tensor_sizes = list(map(np.prod, tensor_shapes)) indices = np.cumsum(tensor_sizes)[:-1] return [ np.reshape(pair[0], pair[1]) for pair in zip(np.split(flattened, indices), tensor_shapes) ]
[docs]def pad_tensor(x, max_len, mode='zero'): """Pad tensors. Args: x (numpy.ndarray): Tensors to be padded. max_len (int): Maximum length. mode (str): If 'last', pad with the last element, otherwise pad with 0. Returns: numpy.ndarray: Padded tensor. """ padding = np.zeros_like(x[0]) if mode == 'last': padding = x[-1] return np.concatenate( [x, np.tile(padding, (max_len - len(x), ) + (1, ) * np.ndim(x[0]))])
[docs]def pad_tensor_n(xs, max_len): """Pad array of tensors. Args: xs (numpy.ndarray): Tensors to be padded. max_len (int): Maximum length. Returns: numpy.ndarray: Padded tensor. """ ret = np.zeros((len(xs), max_len) + xs[0].shape[1:], dtype=xs[0].dtype) for idx, x in enumerate(xs): ret[idx][:len(x)] = x return ret
[docs]def pad_tensor_dict(tensor_dict, max_len, mode='zero'): """Pad dictionary of tensors. Args: tensor_dict (dict[numpy.ndarray]): Tensors to be padded. max_len (int): Maximum length. mode (str): If 'last', pad with the last element, otherwise pad with 0. Returns: dict[numpy.ndarray]: Padded tensor. """ keys = list(tensor_dict.keys()) ret = dict() for k in keys: if isinstance(tensor_dict[k], dict): ret[k] = pad_tensor_dict(tensor_dict[k], max_len, mode=mode) else: ret[k] = pad_tensor(tensor_dict[k], max_len, mode=mode) return ret
[docs]def stack_tensor_dict_list(tensor_dict_list): """Stack a list of dictionaries of {tensors or dictionary of tensors}. Args: tensor_dict_list (dict[list]): a list of dictionaries of {tensors or dictionary of tensors}. Return: dict: a dictionary of {stacked tensors or dictionary of stacked tensors} """ keys = list(tensor_dict_list[0].keys()) ret = dict() for k in keys: example = tensor_dict_list[0][k] dict_list = [x[k] if k in x else [] for x in tensor_dict_list] if isinstance(example, dict): v = stack_tensor_dict_list(dict_list) else: v = np.array(dict_list) ret[k] = v return ret
[docs]def concat_tensor_dict_list(tensor_dict_list): """Concatenate dictionary of list of tensor. Args: tensor_dict_list (dict[list]): a list of dictionaries of {tensors or dictionary of tensors}. Return: dict: a dictionary of {stacked tensors or dictionary of stacked tensors} """ keys = list(tensor_dict_list[0].keys()) ret = dict() for k in keys: example = tensor_dict_list[0][k] dict_list = [x[k] if k in x else [] for x in tensor_dict_list] if isinstance(example, dict): v = concat_tensor_dict_list(dict_list) else: v = np.concatenate(dict_list, axis=0) ret[k] = v return ret
[docs]def split_tensor_dict_list(tensor_dict): """Split dictionary of list of tensor. Args: tensor_dict (dict[numpy.ndarray]): a dictionary of {tensors or dictionary of tensors}. Return: dict: a dictionary of {stacked tensors or dictionary of stacked tensors} """ keys = list(tensor_dict.keys()) ret = None for k in keys: vals = tensor_dict[k] if isinstance(vals, dict): vals = split_tensor_dict_list(vals) if ret is None: ret = [{k: v} for v in vals] else: for v, cur_dict in zip(vals, ret): cur_dict[k] = v return ret
[docs]def truncate_tensor_dict(tensor_dict, truncated_len): """Truncate dictionary of list of tensor. Args: tensor_dict (dict[numpy.ndarray]): a dictionary of {tensors or dictionary of tensors}. truncated_len (int): Length to truncate. Return: dict: a dictionary of {stacked tensors or dictionary of stacked tensors} """ ret = dict() for k, v in tensor_dict.items(): if isinstance(v, dict): ret[k] = truncate_tensor_dict(v, truncated_len) else: ret[k] = v[:truncated_len] return ret
[docs]def normalize_pixel_batch(env_spec, observations): """Normalize the observations (images). If the input are images, it normalized into range [0, 1]. Args: env_spec (garage.envs.EnvSpec): Environment specification. observations (numpy.ndarray): Observations from environment. Returns: numpy.ndarray: Normalized observations. """ if isinstance(env_spec.observation_space, gym.spaces.Box): if len(env_spec.observation_space.shape) == 3: return [obs.astype(np.float32) / 255.0 for obs in observations] return observations