"""Utility functions for NumPy-based Reinforcement learning algorithms."""
import numpy as np
import scipy.signal
[docs]def explained_variance_1d(ypred, y, valids=None):
"""Explained variation for 1D inputs.
It is the proportion of the variance in one variable that is explained or
predicted from another variable.
Args:
ypred (np.ndarray): Sample data from the first variable.
Shape: :math:`(N, max_episode_length)`.
y (np.ndarray): Sample data from the second variable.
Shape: :math:`(N, max_episode_length)`.
valids (np.ndarray): Optional argument. Array indicating valid indices.
If None, it assumes the entire input array are valid.
Shape: :math:`(N, max_episode_length)`.
Returns:
float: The explained variance.
"""
if valids is not None:
ypred = ypred[valids.astype(np.bool)]
y = y[valids.astype(np.bool)]
assert y.ndim == 1 and ypred.ndim == 1
vary = np.var(y)
if np.isclose(vary, 0):
if np.var(ypred) > 0:
return 0
return 1
return 1 - np.var(y - ypred) / (vary + 1e-8)
[docs]def rrse(actual, predicted):
"""Root Relative Squared Error.
Args:
actual (np.ndarray): The actual value.
predicted (np.ndarray): The predicted value.
Returns:
float: The root relative square error between the actual and the
predicted value.
"""
return np.sqrt(
np.sum(np.square(actual - predicted)) /
np.sum(np.square(actual - np.mean(actual))))
[docs]def sliding_window(t, window, smear=False):
"""Create a sliding window over a tensor.
Args:
t (np.ndarray): A tensor to create sliding window from,
with shape :math:`(N, D)`, where N is the length of a trajectory,
D is the dimension of each step in trajectory.
window (int): Window size, mush be less than N.
smear (bool): If true, copy the last window so that N windows are
generated.
Returns:
np.ndarray: All windows generate over t, with shape :math:`(M, W, D)`,
where W is the window size. If smear if False, M is :math:`N-W+1`,
otherwise M is N.
Raises:
NotImplementedError: If step_size is not 1.
ValueError: If window size is larger than the input tensor.
"""
if window > t.shape[0]:
raise ValueError('`window` must be <= `t.shape[0]`')
if window == t.shape[0]:
return np.stack([t] * window)
# The stride trick works only on the last dimension of an ndarray, so we
# operate on the transpose, which reverses the dimensions of t.
t_T = t.T
shape = t_T.shape[:-1] + (t_T.shape[-1] - window, window)
strides = t_T.strides + (t_T.strides[-1], )
t_T_win = np.lib.stride_tricks.as_strided(t_T,
shape=shape,
strides=strides)
# t_T_win has shape (d_k, d_k-1, ..., (n - window_size), window_size)
# To arrive at the final shape, we first transpose the result to arrive at
# (window_size, (n - window_size), d_1, ..., d_k), then swap the firs two
# axes
t_win = np.swapaxes(t_T_win.T, 0, 1)
# Optionally smear the last element to preserve the first dimension
if smear:
t_win = pad_tensor(t_win, t.shape[0], mode='last')
return t_win
[docs]def discount_cumsum(x, discount):
"""Discounted cumulative sum.
See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering # noqa: E501
Here, we have y[t] - discount*y[t+1] = x[t]
or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t]
Args:
x (np.ndarrary): Input.
discount (float): Discount factor.
Returns:
np.ndarrary: Discounted cumulative sum.
"""
return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1],
axis=0)[::-1]
[docs]def flatten_tensors(tensors):
"""Flatten a list of tensors.
Args:
tensors (list[numpy.ndarray]): List of tensors to be flattened.
Returns:
numpy.ndarray: Flattened tensors.
Example:
.. testsetup::
from garage.np import flatten_tensors
>>> flatten_tensors([np.ndarray([1]), np.ndarray([1])])
array(...)
"""
if tensors:
return np.concatenate([np.reshape(x, [-1]) for x in tensors])
return np.asarray([])
[docs]def unflatten_tensors(flattened, tensor_shapes):
"""Unflatten a flattened tensors into a list of tensors.
Args:
flattened (numpy.ndarray): Flattened tensors.
tensor_shapes (tuple): Tensor shapes.
Returns:
list[numpy.ndarray]: Unflattened list of tensors.
"""
tensor_sizes = list(map(np.prod, tensor_shapes))
indices = np.cumsum(tensor_sizes)[:-1]
return [
np.reshape(pair[0], pair[1])
for pair in zip(np.split(flattened, indices), tensor_shapes)
]
[docs]def pad_tensor(x, max_len, mode='zero'):
"""Pad tensors.
Args:
x (numpy.ndarray): Tensors to be padded.
max_len (int): Maximum length.
mode (str): If 'last', pad with the last element, otherwise pad with 0.
Returns:
numpy.ndarray: Padded tensor.
"""
padding = np.zeros_like(x[0])
if mode == 'last':
padding = x[-1]
return np.concatenate(
[x, np.tile(padding, (max_len - len(x), ) + (1, ) * np.ndim(x[0]))])
[docs]def pad_tensor_n(xs, max_len):
"""Pad array of tensors.
Args:
xs (numpy.ndarray): Tensors to be padded.
max_len (int): Maximum length.
Returns:
numpy.ndarray: Padded tensor.
"""
ret = np.zeros((len(xs), max_len) + xs[0].shape[1:], dtype=xs[0].dtype)
for idx, x in enumerate(xs):
ret[idx][:len(x)] = x
return ret
[docs]def pad_tensor_dict(tensor_dict, max_len, mode='zero'):
"""Pad dictionary of tensors.
Args:
tensor_dict (dict[numpy.ndarray]): Tensors to be padded.
max_len (int): Maximum length.
mode (str): If 'last', pad with the last element, otherwise pad with 0.
Returns:
dict[numpy.ndarray]: Padded tensor.
"""
keys = list(tensor_dict.keys())
ret = dict()
for k in keys:
if isinstance(tensor_dict[k], dict):
ret[k] = pad_tensor_dict(tensor_dict[k], max_len, mode=mode)
else:
ret[k] = pad_tensor(tensor_dict[k], max_len, mode=mode)
return ret
[docs]def stack_tensor_dict_list(tensor_dict_list):
"""Stack a list of dictionaries of {tensors or dictionary of tensors}.
Args:
tensor_dict_list (dict[list]): a list of dictionaries of {tensors or
dictionary of tensors}.
Return:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}
"""
keys = list(tensor_dict_list[0].keys())
ret = dict()
for k in keys:
example = tensor_dict_list[0][k]
dict_list = [x[k] if k in x else [] for x in tensor_dict_list]
if isinstance(example, dict):
v = stack_tensor_dict_list(dict_list)
else:
v = np.array(dict_list)
ret[k] = v
return ret
[docs]def stack_and_pad_tensor_dict_list(tensor_dict_list, max_len):
"""Stack and pad array of list of tensors.
Input paths are a list of N dicts, each with values of shape
:math:`(D, S^*)`. This function stack and pad the values with the input
key with max_len, so output will be shape :math:`(N, D, S^*)`.
Args:
tensor_dict_list (list[dict]): List of dict to be stacked and padded.
Value of each dict will be shape of :math:`(D, S^*)`.
max_len (int): Maximum length for padding.
Returns:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}. Shape: :math:`(N, D, S^*)`
where N is the len of input paths.
"""
keys = list(tensor_dict_list[0].keys())
ret = dict()
for k in keys:
example = tensor_dict_list[0][k]
dict_list = [x[k] if k in x else [] for x in tensor_dict_list]
if isinstance(example, dict):
v = stack_and_pad_tensor_dict_list(dict_list, max_len)
else:
v = pad_tensor_n(np.array(dict_list), max_len)
ret[k] = v
return ret
[docs]def concat_tensor_dict_list(tensor_dict_list):
"""Concatenate dictionary of list of tensor.
Args:
tensor_dict_list (dict[list]): a list of dictionaries of {tensors or
dictionary of tensors}.
Return:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}
"""
keys = list(tensor_dict_list[0].keys())
ret = dict()
for k in keys:
example = tensor_dict_list[0][k]
dict_list = [x[k] if k in x else [] for x in tensor_dict_list]
if isinstance(example, dict):
v = concat_tensor_dict_list(dict_list)
else:
v = np.concatenate(dict_list, axis=0)
ret[k] = v
return ret
[docs]def truncate_tensor_dict(tensor_dict, truncated_len):
"""Truncate dictionary of list of tensor.
Args:
tensor_dict (dict[numpy.ndarray]): a dictionary of {tensors or
dictionary of tensors}.
truncated_len (int): Length to truncate.
Return:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}
"""
ret = dict()
for k, v in tensor_dict.items():
if isinstance(v, dict):
ret[k] = truncate_tensor_dict(v, truncated_len)
else:
ret[k] = v[:truncated_len]
return ret
[docs]def slice_nested_dict(dict_or_array, start, stop):
"""Slice a dictionary containing arrays (or dictionaries).
This function is primarily intended for un-batching env_infos and
action_infos.
Args:
dict_or_array (dict[str, dict or np.ndarray] or np.ndarray): A nested
dictionary should only contain dictionaries and numpy arrays
(recursively).
start (int): First index to be included in the slice.
stop (int): First index to be excluded from the slice. In other words,
these are typical python slice indices.
Returns:
dict or np.ndarray: The input, but sliced.
"""
if isinstance(dict_or_array, dict):
return {
k: slice_nested_dict(v, start, stop)
for (k, v) in dict_or_array.items()
}
else:
# It *should* be a numpy array (unless someone ignored the type
# signature).
return dict_or_array[start:stop]