Source code for garage.envs.garage_env

"""Wrapper class that converts gym.Env into GarageEnv."""

import copy

import akro
import gym

from garage.envs.env_spec import EnvSpec

# The gym environments using one of the packages in the following lists as
# entry points don't close their viewer windows.
KNOWN_GYM_NOT_CLOSE_VIEWER = [
    # Please keep alphabetized
    'gym.envs.atari',
    'gym.envs.box2d',
    'gym.envs.classic_control'
]

KNOWN_GYM_NOT_CLOSE_MJ_VIEWER = [
    # Please keep alphabetized
    'gym.envs.mujoco',
    'gym.envs.robotics'
]


[docs]class GarageEnv(gym.Wrapper): """Returns an abstract Garage wrapper class for gym.Env. In order to provide pickling (serialization) and parameterization for gym.Envs, they must be wrapped with a GarageEnv. This ensures compatibility with existing samplers and checkpointing when the envs are passed internally around garage. Furthermore, classes inheriting from GarageEnv should silently convert action_space and observation_space from gym.Spaces to akro.spaces. Args: env (gym.Env): An env that will be wrapped env_name (str): If the env_name is speficied, a gym environment with that name will be created. If such an environment does not exist, a `gym.error` is thrown. is_image (bool): True if observations contain pixel values, false otherwise. Setting this to true converts a gym.Spaces.Box obs space to an akro.Image and normalizes pixel values. """ def __init__(self, env=None, env_name='', is_image=False): # Needed for deserialization self._env_name = env_name self._env = env if env_name: super().__init__(gym.make(env_name)) else: super().__init__(env) self.action_space = akro.from_gym(self.env.action_space) self.observation_space = akro.from_gym(self.env.observation_space, is_image=is_image) self.__spec = EnvSpec(action_space=self.action_space, observation_space=self.observation_space) @property def spec(self): """Return the environment specification. This property needs to exist, since it's defined as a property in gym.Wrapper in a way that makes it difficult to overwrite. Returns: garage.envs.env_spec.EnvSpec: The envionrment specification. """ return self.__spec
[docs] def close(self): """Close the wrapped env.""" self._close_viewer_window() self.env.close()
def _close_viewer_window(self): """Close viewer window. Unfortunately, some gym environments don't close the viewer windows properly, which leads to "out of memory" issues when several of these environments are tested one after the other. This method searches for the viewer object of type MjViewer, Viewer or SimpleImageViewer, based on environment, and if the environment is wrapped in other environment classes, it performs depth search in those as well. This method can be removed once OpenAI solves the issue. """ # We need to do some strange things here to fix-up flaws in gym # pylint: disable=import-outside-toplevel if self.env.spec: if any(package in getattr(self.env.spec, 'entry_point', '') for package in KNOWN_GYM_NOT_CLOSE_MJ_VIEWER): # This import is not in the header to avoid a MuJoCo dependency # with non-MuJoCo environments that use this base class. try: from mujoco_py.mjviewer import MjViewer import glfw except ImportError: # If we can't import mujoco_py, we must not have an # instance of a class that we know how to close here. return if (hasattr(self.env, 'viewer') and isinstance(self.env.viewer, MjViewer)): glfw.destroy_window(self.env.viewer.window) elif any(package in getattr(self.env.spec, 'entry_point', '') for package in KNOWN_GYM_NOT_CLOSE_VIEWER): if hasattr(self.env, 'viewer'): from gym.envs.classic_control.rendering import ( Viewer, SimpleImageViewer) if (isinstance(self.env.viewer, (SimpleImageViewer, Viewer))): self.env.viewer.close()
[docs] def reset(self, **kwargs): """Call reset on wrapped env. This method is necessary to suppress a deprecated warning thrown by gym.Wrapper. Args: kwargs: Keyword args Returns: object: The initial observation. """ return self.env.reset(**kwargs)
[docs] def step(self, action): """Call step on wrapped env. This method is necessary to suppress a deprecated warning thrown by gym.Wrapper. Args: action (object): An action provided by the agent. Returns: object: Agent's observation of the current environment float : Amount of reward returned after previous action bool : Whether the episode has ended, in which case further step() calls will return undefined results dict: Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning) """ observation, reward, done, info = self.env.step(action) # gym envs that are wrapped in TimeLimit wrapper modify # the done/termination signal to be true whenever a time # limit expiration occurs. The following statement sets # the done signal to be True only if caused by an # environment termination, and not a time limit # termination. The time limit termination signal # will be saved inside env_infos as # 'GarageEnv.TimeLimitTerminated' if 'TimeLimit.truncated' in info: info['GarageEnv.TimeLimitTerminated'] = info['TimeLimit.truncated'] if info['TimeLimit.truncated']: done = False else: info['TimeLimit.truncated'] = False info['GarageEnv.TimeLimitTerminated'] = False return observation, reward, done, info
def __getstate__(self): """See `Object.__getstate__. Returns: dict: The instance’s dictionary to be pickled. """ # the viewer object is not pickleable # we first make a copy of the viewer env = self.env # get the inner env if it is a gym.Wrapper if issubclass(env.__class__, gym.Wrapper): env = env.unwrapped if 'viewer' in env.__dict__: _viewer = env.viewer # remove the viewer and make a copy of the state env.viewer = None state = copy.deepcopy(self.__dict__) # assign the viewer back to self.__dict__ env.viewer = _viewer # the returned state doesn't have the viewer return state return self.__dict__ def __setstate__(self, state): """See `Object.__setstate__. Args: state (dict): Unpickled state of this object. """ self.__init__(state['_env'], state['_env_name'])