Source code for garage.sampler.parallel_sampler

"""Original parallel sampler pool backend."""
import pickle
import signal

from dowel import logger
import numpy as np

from garage.experiment import deterministic
from garage.sampler.stateful_pool import SharedGlobal
from garage.sampler.stateful_pool import singleton_pool
from garage.sampler.utils import rollout

def _worker_init(g, id):
    if singleton_pool.n_parallel > 1:
        import os
        os.environ['CUDA_VISIBLE_DEVICES'] = ''
    g.worker_id = id

[docs]def initialize(n_parallel): """Initialize the worker pool.""" # SIGINT is blocked for all processes created in parallel_sampler to avoid # the creation of sleeping and zombie processes. # # If the user interrupts run_experiment, there's a chance some processes # won't die due to a dead lock condition where one of the children in the # parallel sampler exits without releasing a lock once after it catches # SIGINT. # # Later the parent tries to acquire the same lock to proceed with his # cleanup, but it remains sleeping waiting for the lock to be released. # In the meantime, all the process in parallel sampler remain in the zombie # state since the parent cannot proceed with their clean up. try: signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) singleton_pool.initialize(n_parallel) singleton_pool.run_each(_worker_init, [(id, ) for id in range(singleton_pool.n_parallel)]) finally: signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT])
def _get_scoped_g(g, scope): if scope is None: return g if not hasattr(g, 'scopes'): g.scopes = dict() if scope not in g.scopes: g.scopes[scope] = SharedGlobal() g.scopes[scope].worker_id = g.worker_id return g.scopes[scope] def _worker_populate_task(g, env, policy, scope=None): g = _get_scoped_g(g, scope) g.env = pickle.loads(env) g.policy = pickle.loads(policy) def _worker_terminate_task(g, scope=None): g = _get_scoped_g(g, scope) if getattr(g, 'env', None): g.env.close() g.env = None if getattr(g, 'policy', None): g.policy.terminate() g.policy = None
[docs]def populate_task(env, policy, scope=None): """Set each worker's env and policy.""" logger.log('Populating workers...') if singleton_pool.n_parallel > 1: singleton_pool.run_each( _worker_populate_task, [(pickle.dumps(env), pickle.dumps(policy), scope)] * singleton_pool.n_parallel) else: # avoid unnecessary copying g = _get_scoped_g(singleton_pool.G, scope) g.env = env g.policy = policy logger.log('Populated')
[docs]def terminate_task(scope=None): """Close each worker's env and terminate each policy.""" singleton_pool.run_each(_worker_terminate_task, [(scope, )] * singleton_pool.n_parallel)
[docs]def close(): """Close the worker pool.""" singleton_pool.close()
def _worker_set_seed(_, seed): logger.log('Setting seed to %d' % seed) deterministic.set_seed(seed)
[docs]def set_seed(seed): """Set the seed in each worker.""" singleton_pool.run_each(_worker_set_seed, [(seed + i, ) for i in range(singleton_pool.n_parallel)])
def _worker_set_policy_params(g, params, scope=None): g = _get_scoped_g(g, scope) g.policy.set_param_values(params) def _worker_collect_one_path(g, max_path_length, scope=None): g = _get_scoped_g(g, scope) path = rollout(g.env, g.policy, max_path_length=max_path_length) return path, len(path['rewards'])
[docs]def sample_paths(policy_params, max_samples, max_path_length=np.inf, scope=None): """Sample paths from each worker. :param policy_params: parameters for the policy. This will be updated on each worker process :param max_samples: desired maximum number of samples to be collected. The actual number of collected samples might be greater since all trajectories will be rolled out either until termination or until max_path_length is reached :param max_path_length: horizon / maximum length of a single trajectory :return: a list of collected paths """ singleton_pool.run_each(_worker_set_policy_params, [(policy_params, scope)] * singleton_pool.n_parallel) return singleton_pool.run_collect(_worker_collect_one_path, threshold=max_samples, args=(max_path_length, scope), show_prog_bar=True)