"""This module implements a Vectorized Sampler used for OffPolicy Algorithms.

It diffs from OnPolicyVectorizedSampler in two parts:
 - The num of envs is defined by rollout_batch_size. In
 OnPolicyVectorizedSampler, the number of envs can be decided by batch_size
 and max_path_length. But OffPolicy algorithms usually samples transitions
 from replay buffer, which only has buffer_batch_size.
 - It needs to add transitions to replay buffer throughout the rollout.

import itertools
import warnings

import cloudpickle
import numpy as np

from garage.experiment import deterministic
from garage.misc import tensor_utils
from garage.sampler.batch_sampler import BatchSampler
from garage.sampler.vec_env_executor import VecEnvExecutor

[docs]class OffPolicyVectorizedSampler(BatchSampler): """This class implements OffPolicyVectorizedSampler. Args: algo ( Algorithm. env (garage.envs.GarageEnv): Environment. n_envs (int): Number of parallel environments managed by sampler. no_reset (bool): Reset environment between samples or not. """ def __init__(self, algo, env, n_envs=None, no_reset=True): if n_envs is None: n_envs = int(algo.rollout_batch_size) super().__init__(algo, env) self._n_envs = n_envs self._no_reset = no_reset self._vec_env = None self._env_spec = self.env.spec self._last_obses = None self._last_uncounted_discount = [0] * n_envs self._last_running_length = [0] * n_envs self._last_success_count = [0] * n_envs warnings.warn( DeprecationWarning( 'OffPolicyVectoriizedSampler is deprecated, and will be ' 'removed in the next release. Please use VecWorker and one of ' 'the new samplers which implement garage.sampler.Sampler, ' 'such as RaySampler.'))
[docs] def start_worker(self): """Initialize the sampler.""" n_envs = self._n_envs envs = [ cloudpickle.loads(cloudpickle.dumps(self.env)) for _ in range(n_envs) ] # Deterministically set environment seeds based on the global seed. seed0 = deterministic.get_seed() if seed0 is not None: for (i, e) in enumerate(envs): e.seed(seed0 + i) self._vec_env = VecEnvExecutor( envs=envs, max_path_length=self.algo.max_path_length)
[docs] def shutdown_worker(self): """Terminate workers if necessary.""" self._vec_env.close()
# pylint: disable=too-many-branches, too-many-statements
[docs] def obtain_samples(self, itr, batch_size=None, whole_paths=True): """Collect samples for the given iteration number. Args: itr(int): Iteration number. batch_size(int): Number of environment interactions in one batch. whole_paths(bool): Not effective. Only keep here to comply with base class. Raises: ValueError: If the algorithm doesn't have an exploration_policy field. Returns: list: A list of paths. """ assert batch_size is not None paths = [] if not self._no_reset or self._last_obses is None: obses = self._vec_env.reset() else: obses = self._last_obses completes = np.asarray([True] * self._vec_env.num_envs) running_paths = [None] * self._vec_env.num_envs n_samples = 0 policy = self.algo.exploration_policy if policy is None: raise ValueError('OffPolicyVectoriizedSampler should only be used ' 'with an exploration_policy.') while n_samples < batch_size: policy.reset(completes) obs_space = self.algo.env_spec.observation_space input_obses = obs_space.flatten_n(obses) actions, agent_infos = policy.get_actions(input_obses) next_obses, rewards, dones, env_infos, completes = \ self._vec_env.step(actions) self._last_obses = next_obses agent_infos = tensor_utils.split_tensor_dict_list(agent_infos) env_infos = tensor_utils.split_tensor_dict_list(env_infos) n_samples += len(next_obses) if agent_infos is None: agent_infos = [dict() for _ in range(self._vec_env.num_envs)] if env_infos is None: env_infos = [dict() for _ in range(self._vec_env.num_envs)] for (idx, reward, env_info, done, complete, obs, next_obs, action) in zip(itertools.count(), rewards, env_infos, dones, completes, obses, next_obses, actions): if running_paths[idx] is None: running_paths[idx] = dict( rewards=[], observations=[], next_observations=[], actions=[], env_infos=[], dones=[], undiscounted_return=self._last_uncounted_discount[idx], # running_length: Length of path up to now # Note that running_length is not len(rewards) # Because a path may not be complete in one batch running_length=self._last_running_length[idx], success_count=self._last_success_count[idx]) running_paths[idx]['rewards'].append(reward) running_paths[idx]['observations'].append(obs) running_paths[idx]['next_observations'].append(next_obs) running_paths[idx]['actions'].append(action) running_paths[idx]['env_infos'].append(env_info) running_paths[idx]['dones'].append(done) running_paths[idx]['running_length'] += 1 running_paths[idx]['undiscounted_return'] += reward running_paths[idx]['success_count'] += env_info.get( 'is_success') or 0 self._last_uncounted_discount[idx] += reward self._last_success_count[idx] += env_info.get( 'is_success') or 0 self._last_running_length[idx] += 1 if complete or n_samples >= batch_size: paths.append( dict( rewards=np.asarray(running_paths[idx]['rewards']), dones=np.asarray(running_paths[idx]['dones']), env_infos=tensor_utils.stack_tensor_dict_list( running_paths[idx]['env_infos']), running_length=running_paths[idx] ['running_length'], undiscounted_return=running_paths[idx] ['undiscounted_return'], success_count=running_paths[idx]['success_count'])) act_space = self._env_spec.action_space path_dict = {} path_dict['observations'] = obs_space.flatten_n( running_paths[idx]['observations']) path_dict['next_observations'] = obs_space.flatten_n( running_paths[idx]['next_observations']) path_dict['rewards'] = np.asarray( running_paths[idx]['rewards']).reshape(-1, 1) path_dict['terminals'] = np.asarray( running_paths[idx]['dones']).reshape(-1, 1) path_dict['actions'] = act_space.flatten_n( running_paths[idx]['actions']) self.algo.replay_buffer.add_path(path_dict) running_paths[idx] = None if done: self._last_running_length[idx] = 0 self._last_success_count[idx] = 0 self._last_uncounted_discount[idx] = 0 obses = next_obses return paths