import gym.spaces
import numpy as np
import scipy.signal
[docs]def discount_cumsum(x, discount):
# See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering # noqa: E501
# Here, we have y[t] - discount*y[t+1] = x[t]
# or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t]
return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1],
axis=0)[::-1]
[docs]def explained_variance_1d(ypred, y):
assert y.ndim == 1 and ypred.ndim == 1
vary = np.var(y)
if np.isclose(vary, 0):
if np.var(ypred) > 0:
return 0
else:
return 1
return 1 - np.var(y - ypred) / (vary + 1e-8)
[docs]def flatten_tensors(tensors):
if tensors:
return np.concatenate([np.reshape(x, [-1]) for x in tensors])
else:
return np.asarray([])
[docs]def unflatten_tensors(flattened, tensor_shapes):
tensor_sizes = list(map(np.prod, tensor_shapes))
indices = np.cumsum(tensor_sizes)[:-1]
return [
np.reshape(pair[0], pair[1])
for pair in zip(np.split(flattened, indices), tensor_shapes)
]
[docs]def pad_tensor(x, max_len, mode='zero'):
padding = np.zeros_like(x[0])
if mode == 'last':
padding = x[-1]
return np.concatenate(
[x, np.tile(padding, (max_len - len(x), ) + (1, ) * np.ndim(x[0]))])
[docs]def pad_tensor_n(xs, max_len):
ret = np.zeros((len(xs), max_len) + xs[0].shape[1:], dtype=xs[0].dtype)
for idx, x in enumerate(xs):
ret[idx][:len(x)] = x
return ret
[docs]def pad_tensor_dict(tensor_dict, max_len, mode='zero'):
keys = list(tensor_dict.keys())
ret = dict()
for k in keys:
if isinstance(tensor_dict[k], dict):
ret[k] = pad_tensor_dict(tensor_dict[k], max_len, mode=mode)
else:
ret[k] = pad_tensor(tensor_dict[k], max_len, mode=mode)
return ret
[docs]def flatten_first_axis_tensor_dict(tensor_dict):
keys = list(tensor_dict.keys())
ret = dict()
for k in keys:
if isinstance(tensor_dict[k], dict):
ret[k] = flatten_first_axis_tensor_dict(tensor_dict[k])
else:
old_shape = tensor_dict[k].shape
ret[k] = tensor_dict[k].reshape((-1, ) + old_shape[2:])
return ret
[docs]def high_res_normalize(probs):
return [x / sum(map(float, probs)) for x in list(map(float, probs))]
[docs]def stack_tensor_list(tensor_list):
return np.array(tensor_list)
# tensor_shape = np.array(tensor_list[0]).shape
# if tensor_shape is tuple():
# return np.array(tensor_list)
# return np.vstack(tensor_list)
[docs]def stack_tensor_dict_list(tensor_dict_list):
"""
Stack a list of dictionaries of {tensors or dictionary of tensors}.
:param tensor_dict_list: a list of dictionaries of {tensors or dictionary
of tensors}.
:return: a dictionary of {stacked tensors or dictionary of stacked tensors}
"""
keys = list(tensor_dict_list[0].keys())
ret = dict()
for k in keys:
example = tensor_dict_list[0][k]
if isinstance(example, dict):
v = stack_tensor_dict_list([x[k] for x in tensor_dict_list])
else:
v = stack_tensor_list([x[k] for x in tensor_dict_list])
ret[k] = v
return ret
[docs]def concat_tensor_list_subsample(tensor_list, f):
sub_tensor_list = [
t[np.random.choice(len(t), int(np.ceil(len(t) * f)), replace=False)]
for t in tensor_list
]
return np.concatenate(sub_tensor_list, axis=0)
[docs]def concat_tensor_dict_list_subsample(tensor_dict_list, f):
keys = list(tensor_dict_list[0].keys())
ret = dict()
for k in keys:
example = tensor_dict_list[0][k]
if isinstance(example, dict):
v = concat_tensor_dict_list_subsample(
[x[k] for x in tensor_dict_list], f)
else:
v = concat_tensor_list_subsample([x[k] for x in tensor_dict_list],
f)
ret[k] = v
return ret
[docs]def concat_tensor_list(tensor_list):
return np.concatenate(tensor_list, axis=0)
[docs]def concat_tensor_dict_list(tensor_dict_list):
keys = list(tensor_dict_list[0].keys())
ret = dict()
for k in keys:
example = tensor_dict_list[0][k]
if isinstance(example, dict):
v = concat_tensor_dict_list([x[k] for x in tensor_dict_list])
else:
v = concat_tensor_list([x[k] for x in tensor_dict_list])
ret[k] = v
return ret
[docs]def split_tensor_dict_list(tensor_dict):
keys = list(tensor_dict.keys())
ret = None
for k in keys:
vals = tensor_dict[k]
if isinstance(vals, dict):
vals = split_tensor_dict_list(vals)
if ret is None:
ret = [{k: v} for v in vals]
else:
for v, cur_dict in zip(vals, ret):
cur_dict[k] = v
return ret
[docs]def truncate_tensor_list(tensor_list, truncated_len):
return tensor_list[:truncated_len]
[docs]def truncate_tensor_dict(tensor_dict, truncated_len):
ret = dict()
for k, v in tensor_dict.items():
if isinstance(v, dict):
ret[k] = truncate_tensor_dict(v, truncated_len)
else:
ret[k] = truncate_tensor_list(v, truncated_len)
return ret
[docs]def normalize_pixel_batch(env_spec, observations):
"""
Normalize the observations (images).
If the input are images, it normalized into range [0, 1].
Args:
env_spec (garage.envs.EnvSpec): Environment specification.
observations (numpy.ndarray): Observations from environment.
"""
if isinstance(env_spec.observation_space, gym.spaces.Box):
if len(env_spec.observation_space.shape) == 3:
return [obs.astype(np.float32) / 255.0 for obs in observations]
return observations