Source code for garage.sampler.parallel_vec_env_executor

"""Environment wrapper that runs multiple environments in parallel."""

import pickle as pickle
import uuid

from dowel import logger
import numpy as np

from garage.misc import tensor_utils
from garage.sampler.stateful_pool import singleton_pool


[docs]def worker_init_envs(g, alloc, scope, env): """Initialize the environment on a worker.""" logger.log('initializing environment on worker %d' % g.worker_id) if not hasattr(g, 'parallel_vec_envs'): g.parallel_vec_envs = dict() g.parallel_vec_env_template = dict() g.parallel_vec_envs[scope] = [(idx, pickle.loads(pickle.dumps(env))) for idx in alloc] g.parallel_vec_env_template[scope] = env
# For these two methods below, we pack the data into batch numpy arrays # whenever possible, to reduce communication cost
[docs]def worker_run_reset(g, flags, scope): """Reset the environment on a worker.""" if not hasattr(g, 'parallel_vec_envs'): logger.log('on worker %d' % g.worker_id) import traceback for line in traceback.format_stack(): logger.log(line) # log the stacktrace at least logger.log('oops') for k, v in g.__dict__.items(): logger.log(str(k) + ' : ' + str(v)) assert hasattr(g, 'parallel_vec_envs') assert scope in g.parallel_vec_envs n = len(g.parallel_vec_envs[scope]) env_template = g.parallel_vec_env_template[scope] obs_dim = env_template.observation_space.flat_dim ret_arr = np.zeros((n, obs_dim)) ids = [] flat_obs = [] reset_ids = [] for itr_idx, (idx, env) in enumerate(g.parallel_vec_envs[scope]): flag = flags[idx] if flag: flat_obs.append(env.reset()) reset_ids.append(itr_idx) ids.append(idx) if reset_ids: ret_arr[reset_ids] = env_template.observation_space.flatten_n(flat_obs) return ids, ret_arr
[docs]def worker_run_step(g, action_n, scope): """Step the environment on a worker.""" assert hasattr(g, 'parallel_vec_envs') assert scope in g.parallel_vec_envs env_template = g.parallel_vec_env_template[scope] ids = [] step_results = [] for (idx, env) in g.parallel_vec_envs[scope]: action = action_n[idx] ids.append(idx) step_results.append(tuple(env.step(action))) if not step_results: return None obs, rewards, dones, env_infos = list(map(list, list(zip(*step_results)))) obs = env_template.observation_space.flatten_n(obs) rewards = np.asarray(rewards) dones = np.asarray(dones) env_infos = tensor_utils.stack_tensor_dict_list(env_infos) return ids, obs, rewards, dones, env_infos
[docs]class ParallelVecEnvExecutor: """Environment wrapper that runs multiple environments in parallel.""" def __init__(self, env, n, max_path_length, scope=None): if scope is None: # initialize random scope scope = str(uuid.uuid4()) envs_per_worker = int(np.ceil(n * 1.0 / singleton_pool.n_parallel)) alloc_env_ids = [] rest_alloc = n start_id = 0 for _ in range(singleton_pool.n_parallel): n_allocs = min(envs_per_worker, rest_alloc) alloc_env_ids.append(list(range(start_id, start_id + n_allocs))) start_id += n_allocs rest_alloc = max(0, rest_alloc - envs_per_worker) singleton_pool.run_each(worker_init_envs, [(alloc, scope, env) for alloc in alloc_env_ids]) self._alloc_env_ids = alloc_env_ids self._action_space = env.action_space self._observation_space = env.observation_space self._num_envs = n self.scope = scope self.ts = np.zeros(n, dtype='int') self.max_path_length = max_path_length
[docs] def step(self, action_n): """Step all environments using the provided actions.""" results = singleton_pool.run_each( worker_run_step, [(action_n, self.scope) for _ in self._alloc_env_ids], ) results = [x for x in results if x is not None] ids, obs, rewards, dones, env_infos = list(zip(*results)) ids = np.concatenate(ids) obs = self.observation_space.unflatten_n(np.concatenate(obs)) rewards = np.concatenate(rewards) dones = np.concatenate(dones) env_infos = tensor_utils.split_tensor_dict_list( tensor_utils.concat_tensor_dict_list(env_infos)) if env_infos is None: env_infos = [dict() for _ in range(self.num_envs)] items = list(zip(ids, obs, rewards, dones, env_infos)) items = sorted(items, key=lambda x: x[0]) ids, obs, rewards, dones, env_infos = list(zip(*items)) obs = list(obs) rewards = np.asarray(rewards) dones = np.asarray(dones) self.ts += 1 dones[self.ts >= self.max_path_length] = True reset_obs = self._run_reset(dones) for (i, done) in enumerate(dones): if done: obs[i] = reset_obs[i] self.ts[i] = 0 return obs, rewards, dones, tensor_utils.stack_tensor_dict_list( list(env_infos))
def _run_reset(self, dones): dones = np.asarray(dones) results = singleton_pool.run_each( worker_run_reset, [(dones, self.scope) for _ in self._alloc_env_ids], ) ids, flat_obs = list(map(np.concatenate, list(zip(*results)))) zipped = list(zip(ids, flat_obs)) sorted_obs = np.asarray( [x[1] for x in sorted(zipped, key=lambda x: x[0])]) done_ids, = np.where(dones) done_flat_obs = sorted_obs[done_ids] done_unflat_obs = self.observation_space.unflatten_n(done_flat_obs) all_obs = [None] * self.num_envs done_cursor = 0 for idx, done in enumerate(dones): if done: all_obs[idx] = done_unflat_obs[done_cursor] done_cursor += 1 return all_obs
[docs] def reset(self): """Reset all environments.""" dones = np.asarray([True] * self.num_envs) return self._run_reset(dones)
@property def num_envs(self): """Read / write the number of environments.""" return self._num_envs @property def action_space(self): """Read / write the action space.""" return self._action_space @property def observation_space(self): """Read / write the observation space.""" return self._observation_space
[docs] def close(self): """Close all environments.""" pass