Source code for garage.misc.tensor_utils

import gym.spaces
import numpy as np
import scipy.signal


[docs]def discount_cumsum(x, discount): # See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering # noqa: E501 # Here, we have y[t] - discount*y[t+1] = x[t] # or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t] return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
[docs]def explained_variance_1d(ypred, y): assert y.ndim == 1 and ypred.ndim == 1 vary = np.var(y) if np.isclose(vary, 0): if np.var(ypred) > 0: return 0 else: return 1 return 1 - np.var(y - ypred) / (vary + 1e-8)
[docs]def flatten_tensors(tensors): if tensors: return np.concatenate([np.reshape(x, [-1]) for x in tensors]) else: return np.asarray([])
[docs]def unflatten_tensors(flattened, tensor_shapes): tensor_sizes = list(map(np.prod, tensor_shapes)) indices = np.cumsum(tensor_sizes)[:-1] return [ np.reshape(pair[0], pair[1]) for pair in zip(np.split(flattened, indices), tensor_shapes) ]
[docs]def pad_tensor(x, max_len, mode='zero'): padding = np.zeros_like(x[0]) if mode == 'last': padding = x[-1] return np.concatenate( [x, np.tile(padding, (max_len - len(x), ) + (1, ) * np.ndim(x[0]))])
[docs]def pad_tensor_n(xs, max_len): ret = np.zeros((len(xs), max_len) + xs[0].shape[1:], dtype=xs[0].dtype) for idx, x in enumerate(xs): ret[idx][:len(x)] = x return ret
[docs]def pad_tensor_dict(tensor_dict, max_len, mode='zero'): keys = list(tensor_dict.keys()) ret = dict() for k in keys: if isinstance(tensor_dict[k], dict): ret[k] = pad_tensor_dict(tensor_dict[k], max_len, mode=mode) else: ret[k] = pad_tensor(tensor_dict[k], max_len, mode=mode) return ret
[docs]def flatten_first_axis_tensor_dict(tensor_dict): keys = list(tensor_dict.keys()) ret = dict() for k in keys: if isinstance(tensor_dict[k], dict): ret[k] = flatten_first_axis_tensor_dict(tensor_dict[k]) else: old_shape = tensor_dict[k].shape ret[k] = tensor_dict[k].reshape((-1, ) + old_shape[2:]) return ret
[docs]def high_res_normalize(probs): return [x / sum(map(float, probs)) for x in list(map(float, probs))]
[docs]def stack_tensor_list(tensor_list): return np.array(tensor_list)
# tensor_shape = np.array(tensor_list[0]).shape # if tensor_shape is tuple(): # return np.array(tensor_list) # return np.vstack(tensor_list)
[docs]def stack_tensor_dict_list(tensor_dict_list): """ Stack a list of dictionaries of {tensors or dictionary of tensors}. :param tensor_dict_list: a list of dictionaries of {tensors or dictionary of tensors}. :return: a dictionary of {stacked tensors or dictionary of stacked tensors} """ keys = list(tensor_dict_list[0].keys()) ret = dict() for k in keys: example = tensor_dict_list[0][k] if isinstance(example, dict): v = stack_tensor_dict_list([x[k] for x in tensor_dict_list]) else: v = stack_tensor_list([x[k] for x in tensor_dict_list]) ret[k] = v return ret
[docs]def concat_tensor_list_subsample(tensor_list, f): sub_tensor_list = [ t[np.random.choice(len(t), int(np.ceil(len(t) * f)), replace=False)] for t in tensor_list ] return np.concatenate(sub_tensor_list, axis=0)
[docs]def concat_tensor_dict_list_subsample(tensor_dict_list, f): keys = list(tensor_dict_list[0].keys()) ret = dict() for k in keys: example = tensor_dict_list[0][k] if isinstance(example, dict): v = concat_tensor_dict_list_subsample( [x[k] for x in tensor_dict_list], f) else: v = concat_tensor_list_subsample([x[k] for x in tensor_dict_list], f) ret[k] = v return ret
[docs]def concat_tensor_list(tensor_list): return np.concatenate(tensor_list, axis=0)
[docs]def concat_tensor_dict_list(tensor_dict_list): keys = list(tensor_dict_list[0].keys()) ret = dict() for k in keys: example = tensor_dict_list[0][k] if isinstance(example, dict): v = concat_tensor_dict_list([x[k] for x in tensor_dict_list]) else: v = concat_tensor_list([x[k] for x in tensor_dict_list]) ret[k] = v return ret
[docs]def split_tensor_dict_list(tensor_dict): keys = list(tensor_dict.keys()) ret = None for k in keys: vals = tensor_dict[k] if isinstance(vals, dict): vals = split_tensor_dict_list(vals) if ret is None: ret = [{k: v} for v in vals] else: for v, cur_dict in zip(vals, ret): cur_dict[k] = v return ret
[docs]def truncate_tensor_list(tensor_list, truncated_len): return tensor_list[:truncated_len]
[docs]def truncate_tensor_dict(tensor_dict, truncated_len): ret = dict() for k, v in tensor_dict.items(): if isinstance(v, dict): ret[k] = truncate_tensor_dict(v, truncated_len) else: ret[k] = truncate_tensor_list(v, truncated_len) return ret
[docs]def normalize_pixel_batch(env_spec, observations): """ Normalize the observations (images). If the input are images, it normalized into range [0, 1]. Args: env_spec (garage.envs.EnvSpec): Environment specification. observations (numpy.ndarray): Observations from environment. """ if isinstance(env_spec.observation_space, gym.spaces.Box): if len(env_spec.observation_space.shape) == 3: return [obs.astype(np.float32) / 255.0 for obs in observations] return observations