# Source code for garage.np._functions

"""Utility functions for NumPy-based Reinforcement learning algorithms."""
import numpy as np
import scipy.signal

[docs]def explained_variance_1d(ypred, y, valids=None):
"""Explained variation for 1D inputs.

It is the proportion of the variance in one variable that is explained or
predicted from another variable.

Args:
ypred (np.ndarray): Sample data from the first variable.
Shape: :math:(N, max_episode_length).
y (np.ndarray): Sample data from the second variable.
Shape: :math:(N, max_episode_length).
valids (np.ndarray): Optional argument. Array indicating valid indices.
If None, it assumes the entire input array are valid.
Shape: :math:(N, max_episode_length).

Returns:
float: The explained variance.

"""
if valids is not None:
ypred = ypred[valids.astype(np.bool)]
y = y[valids.astype(np.bool)]

assert y.ndim == 1 and ypred.ndim == 1

vary = np.var(y)
if np.isclose(vary, 0):

if np.var(ypred) > 0:
return 0

return 1

return 1 - np.var(y - ypred) / (vary + 1e-8)

[docs]def rrse(actual, predicted):
"""Root Relative Squared Error.

Args:
actual (np.ndarray): The actual value.
predicted (np.ndarray): The predicted value.

Returns:
float: The root relative square error between the actual and the
predicted value.

"""
return np.sqrt(
np.sum(np.square(actual - predicted)) /
np.sum(np.square(actual - np.mean(actual))))

[docs]def sliding_window(t, window, smear=False):
"""Create a sliding window over a tensor.

Args:
t (np.ndarray): A tensor to create sliding window from,
with shape :math:(N, D), where N is the length of a trajectory,
D is the dimension of each step in trajectory.
window (int): Window size, mush be less than N.
smear (bool): If true, copy the last window so that N windows are
generated.

Returns:
np.ndarray: All windows generate over t, with shape :math:(M, W, D),
where W is the window size. If smear if False, M is :math:N-W+1,
otherwise M is N.

Raises:
NotImplementedError: If step_size is not 1.
ValueError: If window size is larger than the input tensor.

"""
if window > t.shape:
raise ValueError('window must be <= t.shape')

if window == t.shape:
return np.stack([t] * window)

# The stride trick works only on the last dimension of an ndarray, so we
# operate on the transpose, which reverses the dimensions of t.
t_T = t.T

shape = t_T.shape[:-1] + (t_T.shape[-1] - window, window)
strides = t_T.strides + (t_T.strides[-1], )
t_T_win = np.lib.stride_tricks.as_strided(t_T,
shape=shape,
strides=strides)

# t_T_win has shape (d_k, d_k-1, ..., (n - window_size), window_size)
# To arrive at the final shape, we first transpose the result to arrive at
# (window_size, (n - window_size), d_1, ..., d_k), then swap the firs two
# axes
t_win = np.swapaxes(t_T_win.T, 0, 1)

# Optionally smear the last element to preserve the first dimension
if smear:

return t_win

[docs]def discount_cumsum(x, discount):
"""Discounted cumulative sum.

See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering  # noqa: E501
Here, we have y[t] - discount*y[t+1] = x[t]
or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t]

Args:
x (np.ndarrary): Input.
discount (float): Discount factor.

Returns:
np.ndarrary: Discounted cumulative sum.

"""
return scipy.signal.lfilter(, [1, float(-discount)], x[::-1],
axis=0)[::-1]

[docs]def flatten_tensors(tensors):
"""Flatten a list of tensors.

Args:
tensors (list[numpy.ndarray]): List of tensors to be flattened.

Returns:
numpy.ndarray: Flattened tensors.

Example:

.. testsetup::

from garage.np import flatten_tensors

>>> flatten_tensors([np.ndarray(), np.ndarray()])
array(...)

"""
if tensors:
return np.concatenate([np.reshape(x, [-1]) for x in tensors])

return np.asarray([])

[docs]def unflatten_tensors(flattened, tensor_shapes):
"""Unflatten a flattened tensors into a list of tensors.

Args:
flattened (numpy.ndarray): Flattened tensors.
tensor_shapes (tuple): Tensor shapes.

Returns:
list[numpy.ndarray]: Unflattened list of tensors.

"""
tensor_sizes = list(map(np.prod, tensor_shapes))
indices = np.cumsum(tensor_sizes)[:-1]
return [
np.reshape(pair, pair)
for pair in zip(np.split(flattened, indices), tensor_shapes)
]

Args:
x (numpy.ndarray): Tensors to be padded.
max_len (int): Maximum length.
mode (str): If 'last', pad with the last element, otherwise pad with 0.

Returns:

"""
if mode == 'last':

return np.concatenate(
[x, np.tile(padding, (max_len - len(x), ) + (1, ) * np.ndim(x))])

Args:
xs (numpy.ndarray): Tensors to be padded.
max_len (int): Maximum length.

Returns:

"""
ret = np.zeros((len(xs), max_len) + xs.shape[1:], dtype=xs.dtype)
for idx, x in enumerate(xs):
ret[idx][:len(x)] = x

return ret

Args:
tensor_dict (dict[numpy.ndarray]): Tensors to be padded.
max_len (int): Maximum length.
mode (str): If 'last', pad with the last element, otherwise pad with 0.

Returns:

"""
keys = list(tensor_dict.keys())
ret = dict()
for k in keys:
if isinstance(tensor_dict[k], dict):
else:

return ret

[docs]def stack_tensor_dict_list(tensor_dict_list):
"""Stack a list of dictionaries of {tensors or dictionary of tensors}.

Args:
tensor_dict_list (dict[list]): a list of dictionaries of {tensors or
dictionary of tensors}.

Return:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}

"""
keys = list(tensor_dict_list.keys())
ret = dict()
for k in keys:
example = tensor_dict_list[k]
dict_list = [x[k] if k in x else [] for x in tensor_dict_list]
if isinstance(example, dict):
v = stack_tensor_dict_list(dict_list)
else:
v = np.array(dict_list)

ret[k] = v

return ret

"""Stack and pad array of list of tensors.

Input paths are a list of N dicts, each with values of shape
:math:(D, S^*). This function stack and pad the values with the input
key with max_len, so output will be shape :math:(N, D, S^*).

Args:
tensor_dict_list (list[dict]): List of dict to be stacked and padded.
Value of each dict will be shape of :math:(D, S^*).
max_len (int): Maximum length for padding.

Returns:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}. Shape: :math:(N, D, S^*)
where N is the len of input paths.

"""
keys = list(tensor_dict_list.keys())
ret = dict()
for k in keys:
example = tensor_dict_list[k]
dict_list = [x[k] if k in x else [] for x in tensor_dict_list]

if isinstance(example, dict):
else:
ret[k] = v

return ret

[docs]def concat_tensor_dict_list(tensor_dict_list):
"""Concatenate dictionary of list of tensor.

Args:
tensor_dict_list (dict[list]): a list of dictionaries of {tensors or
dictionary of tensors}.

Return:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}

"""
keys = list(tensor_dict_list.keys())
ret = dict()
for k in keys:
example = tensor_dict_list[k]
dict_list = [x[k] if k in x else [] for x in tensor_dict_list]
if isinstance(example, dict):
v = concat_tensor_dict_list(dict_list)
else:
v = np.concatenate(dict_list, axis=0)

ret[k] = v

return ret

[docs]def truncate_tensor_dict(tensor_dict, truncated_len):
"""Truncate dictionary of list of tensor.

Args:
tensor_dict (dict[numpy.ndarray]): a dictionary of {tensors or
dictionary of tensors}.
truncated_len (int): Length to truncate.

Return:
dict: a dictionary of {stacked tensors or dictionary of
stacked tensors}

"""
ret = dict()
for k, v in tensor_dict.items():
if isinstance(v, dict):
ret[k] = truncate_tensor_dict(v, truncated_len)
else:
ret[k] = v[:truncated_len]

return ret

[docs]def slice_nested_dict(dict_or_array, start, stop):
"""Slice a dictionary containing arrays (or dictionaries).

This function is primarily intended for un-batching env_infos and
action_infos.

Args:
dict_or_array (dict[str, dict or np.ndarray] or np.ndarray): A nested
dictionary should only contain dictionaries and numpy arrays
(recursively).
start (int): First index to be included in the slice.
stop (int): First index to be excluded from the slice. In other words,
these are typical python slice indices.

Returns:
dict or np.ndarray: The input, but sliced.

"""
if isinstance(dict_or_array, dict):
return {
k: slice_nested_dict(v, start, stop)
for (k, v) in dict_or_array.items()
}
else:
# It *should* be a numpy array (unless someone ignored the type
# signature).
return dict_or_array[start:stop]