Source code for garage.sampler.on_policy_vectorized_sampler

"""BatchSampler which uses VecEnvExecutor to run multiple environments."""
import itertools
import time
import warnings

import click
import cloudpickle
from dowel import logger, tabular
import numpy as np

from garage.experiment import deterministic
from garage.misc import tensor_utils
from garage.sampler.batch_sampler import BatchSampler
from garage.sampler.stateful_pool import singleton_pool
from garage.sampler.utils import truncate_paths
from garage.sampler.vec_env_executor import VecEnvExecutor


[docs]class OnPolicyVectorizedSampler(BatchSampler): """BatchSampler which uses VecEnvExecutor to run multiple environments. Args: algo (garage.np.algos.RLAlgorithm): An algorithm instance. env (garage.envs.GarageEnv): An environement instance. n_envs (int): Number of environment instances to setup. This parameter has effect on sampling performance. """ def __init__(self, algo, env, n_envs=None): if n_envs is None: n_envs = singleton_pool.n_parallel * 4 super().__init__(algo, env) self._n_envs = n_envs self._vec_env = None self._env_spec = self.env.spec warnings.warn( DeprecationWarning( 'OnPolicyVectoriizedSampler is deprecated, and will be ' 'removed in the next release. Please use VecWorker and one of ' 'the new samplers which implement garage.sampler.Sampler, ' 'such as RaySampler.'))
[docs] def start_worker(self): """Start workers.""" n_envs = self._n_envs envs = [ cloudpickle.loads(cloudpickle.dumps(self.env)) for _ in range(n_envs) ] # Deterministically set environment seeds based on the global seed. seed0 = deterministic.get_seed() if seed0 is not None: for (i, e) in enumerate(envs): e.seed(seed0 + i) self._vec_env = VecEnvExecutor( envs=envs, max_path_length=self.algo.max_path_length)
[docs] def shutdown_worker(self): """Shutdown workers.""" self._vec_env.close()
# pylint: disable=too-many-statements
[docs] def obtain_samples(self, itr, batch_size=None, whole_paths=True): """Sample the policy for new trajectories. Args: itr (int): Iteration number. batch_size (int): Number of samples to be collected. If None, it will be default [algo.max_path_length * n_envs]. whole_paths (bool): Whether return all the paths or not. True by default. It's possible for the paths to have total actual sample size larger than batch_size, and will be truncated if this flag is true. Returns: list[dict]: Sample paths. Note: Each path is a dictionary, with keys and values as following: * observations: numpy.ndarray with shape [Batch, *obs_dims] * actions: numpy.ndarray with shape [Batch, *act_dims] * rewards: numpy.ndarray with shape [Batch, ] * env_infos: A dictionary with each key representing one environment info, value being a numpy.ndarray with shape [Batch, ?]. One example is "ale.lives" for atari environments. * agent_infos: A dictionary with each key representing one agent info, value being a numpy.ndarray with shape [Batch, ?]. One example is "prev_action", which is used for recurrent policy as previous action input, merged with the observation input as the state input. * dones: numpy.ndarray with shape [Batch, ] """ logger.log('Obtaining samples for iteration %d...' % itr) if not batch_size: batch_size = self.algo.max_path_length * self._n_envs paths = [] n_samples = 0 obses = self._vec_env.reset() completes = np.asarray([True] * self._vec_env.num_envs) running_paths = [None] * self._vec_env.num_envs policy_time = 0 env_time = 0 process_time = 0 policy = self.algo.policy with click.progressbar(length=batch_size, label='Sampling') as pbar: while n_samples < batch_size: t = time.time() policy.reset(completes) actions, agent_infos = policy.get_actions(obses) policy_time += time.time() - t t = time.time() next_obses, rewards, dones, env_infos, completes = \ self._vec_env.step(actions) env_time += time.time() - t t = time.time() agent_infos = tensor_utils.split_tensor_dict_list(agent_infos) env_infos = tensor_utils.split_tensor_dict_list(env_infos) if env_infos is None: env_infos = [dict() for _ in range(self._vec_env.num_envs)] if agent_infos is None: agent_infos = [ dict() for _ in range(self._vec_env.num_envs) ] for idx, observation, action, reward, env_info, agent_info, done, complete in zip( # noqa: E501 itertools.count(), obses, actions, rewards, env_infos, agent_infos, dones, completes): if running_paths[idx] is None: running_paths[idx] = dict(observations=[], actions=[], rewards=[], env_infos=[], agent_infos=[], dones=[]) running_paths[idx]['observations'].append(observation) running_paths[idx]['actions'].append(action) running_paths[idx]['rewards'].append(reward) running_paths[idx]['env_infos'].append(env_info) running_paths[idx]['agent_infos'].append(agent_info) running_paths[idx]['dones'].append(done) if complete: obs = np.asarray(running_paths[idx]['observations']) actions = np.asarray(running_paths[idx]['actions']) paths.append( dict(observations=obs, actions=actions, rewards=np.asarray( running_paths[idx]['rewards']), env_infos=tensor_utils.stack_tensor_dict_list( running_paths[idx]['env_infos']), agent_infos=tensor_utils. stack_tensor_dict_list( running_paths[idx]['agent_infos']), dones=np.asarray( running_paths[idx]['dones']))) n_samples += len(running_paths[idx]['rewards']) running_paths[idx] = None process_time += time.time() - t pbar.update(len(obses)) obses = next_obses tabular.record('PolicyExecTime', policy_time) tabular.record('EnvExecTime', env_time) tabular.record('ProcessExecTime', process_time) return paths if whole_paths else truncate_paths(paths, batch_size)