Source code for garage.envs.normalized_env

"""An environment wrapper that normalizes action, observation and reward."""
import gym
import gym.spaces
import gym.spaces.utils
import numpy as np


[docs]class NormalizedEnv(gym.Wrapper): """An environment wrapper for normalization. This wrapper normalizes action, and optionally observation and reward. Args: env (garage.envs.GarageEnv): An environment instance. scale_reward (float): Scale of environment reward. normalize_obs (bool): If True, normalize observation. normalize_reward (bool): If True, normalize reward. scale_reward is applied after normalization. expected_action_scale (float): Assuming action falls in the range of [-expected_action_scale, expected_action_scale] when normalize it. flatten_obs (bool): Flatten observation if True. obs_alpha (float): Update rate of moving average when estimating the mean and variance of observations. reward_alpha (float): Update rate of moving average when estimating the mean and variance of rewards. """ def __init__( self, env, scale_reward=1., normalize_obs=False, normalize_reward=False, expected_action_scale=1., flatten_obs=True, obs_alpha=0.001, reward_alpha=0.001, ): super().__init__(env) self._scale_reward = scale_reward self._normalize_obs = normalize_obs self._normalize_reward = normalize_reward self._expected_action_scale = expected_action_scale self._flatten_obs = flatten_obs self._obs_alpha = obs_alpha flat_obs_dim = gym.spaces.utils.flatdim(env.observation_space) self._obs_mean = np.zeros(flat_obs_dim) self._obs_var = np.ones(flat_obs_dim) self._reward_alpha = reward_alpha self._reward_mean = 0. self._reward_var = 1. def _update_obs_estimate(self, obs): flat_obs = gym.spaces.utils.flatten(self.env.observation_space, obs) self._obs_mean = ( 1 - self._obs_alpha) * self._obs_mean + self._obs_alpha * flat_obs self._obs_var = ( 1 - self._obs_alpha) * self._obs_var + self._obs_alpha * np.square( flat_obs - self._obs_mean) def _update_reward_estimate(self, reward): self._reward_mean = (1 - self._reward_alpha) * \ self._reward_mean + self._reward_alpha * reward self._reward_var = ( 1 - self._reward_alpha ) * self._reward_var + self._reward_alpha * np.square( reward - self._reward_mean) def _apply_normalize_obs(self, obs): """Compute normalized observation. Args: obs (np.ndarray): Observation. Returns: np.ndarray: Normalized observation. """ self._update_obs_estimate(obs) flat_obs = gym.spaces.utils.flatten(self.env.observation_space, obs) normalized_obs = (flat_obs - self._obs_mean) / (np.sqrt(self._obs_var) + 1e-8) if not self._flatten_obs: normalized_obs = gym.spaces.utils.unflatten( self.env.observation_space, normalized_obs) return normalized_obs def _apply_normalize_reward(self, reward): """Compute normalized reward. Args: reward (float): Reward. Returns: float: Normalized reward. """ self._update_reward_estimate(reward) return reward / (np.sqrt(self._reward_var) + 1e-8)
[docs] def reset(self, **kwargs): """Reset environment. Args: **kwargs: Additional parameters for reset. Returns: tuple: * observation (np.ndarray): The observation of the environment. * reward (float): The reward acquired at this time step. * done (boolean): Whether the environment was completed at this time step. * infos (dict): Environment-dependent additional information. """ ret = self.env.reset(**kwargs) if self._normalize_obs: return self._apply_normalize_obs(ret) else: return ret
[docs] def step(self, action): """Feed environment with one step of action and get result. Args: action (np.ndarray): An action fed to the environment. Returns: tuple: * observation (np.ndarray): The observation of the environment. * reward (float): The reward acquired at this time step. * done (boolean): Whether the environment was completed at this time step. * infos (dict): Environment-dependent additional information. """ if isinstance(self.action_space, gym.spaces.Box): # rescale the action when the bounds are not inf lb, ub = self.action_space.low, self.action_space.high if np.all(lb != -np.inf) and np.all(ub != -np.inf): scaled_action = lb + (action + self._expected_action_scale) * ( 0.5 * (ub - lb) / self._expected_action_scale) scaled_action = np.clip(scaled_action, lb, ub) else: scaled_action = action else: scaled_action = action next_obs, reward, done, info = self.env.step(scaled_action) if self._normalize_obs: next_obs = self._apply_normalize_obs(next_obs) if self._normalize_reward: reward = self._apply_normalize_reward(reward) return next_obs, reward * self._scale_reward, done, info
normalize = NormalizedEnv