Source code for garage.sampler.off_policy_vectorized_sampler

"""This module implements a Vectorized Sampler used for OffPolicy Algorithms.

It diffs from OnPolicyVectorizedSampler in two parts:
 - The num of envs is defined by rollout_batch_size. In
 OnPolicyVectorizedSampler, the number of envs can be decided by batch_size
 and max_path_length. But OffPolicy algorithms usually samples transitions
 from replay buffer, which only has buffer_batch_size.
 - It needs to add transitions to replay buffer throughout the rollout.
"""
import itertools
import pickle

import numpy as np

from garage.experiment import deterministic
from garage.misc import tensor_utils
from garage.sampler.batch_sampler import BatchSampler
from garage.sampler.vec_env_executor import VecEnvExecutor


[docs]class OffPolicyVectorizedSampler(BatchSampler): """This class implements OffPolicyVectorizedSampler. Args: algo (garage.np.RLAlgorithm): Algorithm. env (garage.envs.GarageEnv): Environment. n_envs (int): Number of parallel environments managed by sampler. no_reset (bool): Reset environment between samples or not. """ def __init__(self, algo, env, n_envs=None, no_reset=True): if n_envs is None: n_envs = int(algo.rollout_batch_size) super().__init__(algo, env) self.n_envs = n_envs self.no_reset = no_reset self._last_obses = None self._last_uncounted_discount = [0] * n_envs self._last_running_length = [0] * n_envs self._last_success_count = [0] * n_envs self.env_spec = self.env.spec self.vec_env = None
[docs] def start_worker(self): """Initialize the sampler.""" n_envs = self.n_envs envs = [pickle.loads(pickle.dumps(self.env)) for _ in range(n_envs)] # Deterministically set environment seeds based on the global seed. for (i, e) in enumerate(envs): e.seed(deterministic.get_seed() + i) self.vec_env = VecEnvExecutor( envs=envs, max_path_length=self.algo.max_path_length)
[docs] def shutdown_worker(self): """Terminate workers if necessary.""" self.vec_env.close()
# pylint: disable=arguments-differ, too-many-statements, too-many-branches
[docs] def obtain_samples(self, itr, batch_size): """Collect samples for the given iteration number. Args: itr(int): Iteration number. batch_size(int): Number of environment interactions in one batch. Returns: list: A list of paths. """ paths = [] if not self.no_reset or self._last_obses is None: obses = self.vec_env.reset() else: obses = self._last_obses dones = np.asarray([True] * self.vec_env.num_envs) running_paths = [None] * self.vec_env.num_envs n_samples = 0 policy = self.algo.policy if self.algo.es: self.algo.es.reset() while n_samples < batch_size: policy.reset(dones) if self.algo.input_include_goal: obs = [obs['observation'] for obs in obses] d_g = [obs['desired_goal'] for obs in obses] a_g = [obs['achieved_goal'] for obs in obses] input_obses = np.concatenate((obs, d_g), axis=-1) else: input_obses = obses obs_normalized = tensor_utils.normalize_pixel_batch( self.env_spec, input_obses) if self.algo.es: actions, agent_infos = self.algo.es.get_actions( itr, obs_normalized, self.algo.policy) else: actions, agent_infos = self.algo.policy.get_actions( obs_normalized) next_obses, rewards, dones, env_infos = self.vec_env.step(actions) self._last_obses = next_obses agent_infos = tensor_utils.split_tensor_dict_list(agent_infos) env_infos = tensor_utils.split_tensor_dict_list(env_infos) n_samples += len(next_obses) if agent_infos is None: agent_infos = [dict() for _ in range(self.vec_env.num_envs)] if env_infos is None: env_infos = [dict() for _ in range(self.vec_env.num_envs)] if self.algo.input_include_goal: self.algo.replay_buffer.add_transitions( observation=obs, action=actions, goal=d_g, achieved_goal=a_g, terminal=dones, next_observation=[ next_obs['observation'] for next_obs in next_obses ], next_achieved_goal=[ next_obs['achieved_goal'] for next_obs in next_obses ], ) else: self.algo.replay_buffer.add_transitions( observation=obses, action=actions, reward=rewards * self.algo.reward_scale, terminal=dones, next_observation=next_obses, ) for idx, reward, env_info, done in zip(itertools.count(), rewards, env_infos, dones): if running_paths[idx] is None: running_paths[idx] = dict( rewards=[], env_infos=[], dones=[], undiscounted_return=self._last_uncounted_discount[idx], # running_length: Length of path up to now # Note that running_length is not len(rewards) # Because a path may not be complete in one batch running_length=self._last_running_length[idx], success_count=self._last_success_count[idx]) running_paths[idx]['rewards'].append(reward) running_paths[idx]['env_infos'].append(env_info) running_paths[idx]['dones'].append(done) running_paths[idx]['running_length'] += 1 running_paths[idx]['undiscounted_return'] += reward running_paths[idx]['success_count'] += env_info.get( 'is_success') or 0 self._last_uncounted_discount[idx] += reward self._last_success_count[idx] += env_info.get( 'is_success') or 0 self._last_running_length[idx] += 1 if done or n_samples >= batch_size: paths.append( dict( rewards=np.asarray(running_paths[idx]['rewards']), dones=np.asarray(running_paths[idx]['dones']), env_infos=tensor_utils.stack_tensor_dict_list( running_paths[idx]['env_infos']), running_length=running_paths[idx] ['running_length'], undiscounted_return=running_paths[idx] ['undiscounted_return'], success_count=running_paths[idx]['success_count'])) running_paths[idx] = None if done: self._last_running_length[idx] = 0 self._last_success_count[idx] = 0 self._last_uncounted_discount[idx] = 0 if self.algo.es: self.algo.es.reset() obses = next_obses return paths