garage.sampler.multiprocessing_sampler
¶
A multiprocessing sampler which avoids waiting as much as possible.
-
class
MultiprocessingSampler
(worker_factory, agents, envs)[source]¶ Bases:
garage.sampler.sampler.Sampler
Sampler that uses multiprocessing to distribute workers.
- Parameters
worker_factory (WorkerFactory) – Pickleable factory for creating workers. Should be transmitted to other processes / nodes where work needs to be done, then workers should be constructed there.
agents (Policy or List[Policy]) – Agent(s) to use to sample episodes. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.
envs (Environment or List[Environment]) – Environment from which episodes are sampled. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.
-
classmethod
from_worker_factory
(cls, worker_factory, agents, envs)[source]¶ Construct this sampler.
- Parameters
worker_factory (WorkerFactory) – Pickleable factory for creating workers. Should be transmitted to other processes / nodes where work needs to be done, then workers should be constructed there.
agents (Policy or List[Policy]) – Agent(s) to use to sample episodes. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.
envs (Environment or List[Environment]) – Environment from which episodes are sampled. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.
- Returns
An instance of cls.
- Return type
-
obtain_samples
(self, itr, num_samples, agent_update, env_update=None)[source]¶ Collect at least a given number transitions (timesteps).
- Parameters
itr (int) – The current iteration number. Using this argument is deprecated.
num_samples (int) – Minimum number of transitions / timesteps to sample.
agent_update (object) – Value which will be passed into the agent_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.
env_update (object) – Value which will be passed into the env_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.
- Returns
The batch of collected episodes.
- Return type
- Raises
AssertionError – On internal errors.
-
obtain_exact_episodes
(self, n_eps_per_worker, agent_update, env_update=None)[source]¶ Sample an exact number of episodes per worker.
- Parameters
n_eps_per_worker (int) – Exact number of episodes to gather for each worker.
agent_update (object) – Value which will be passed into the agent_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.
env_update (object) – Value which will be passed into the env_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.
- Returns
- Batch of gathered episodes. Always in worker
order. In other words, first all episodes from worker 0, then all episodes from worker 1, etc.
- Return type
- Raises
AssertionError – On internal errors.
-
start_worker
(self)¶ Initialize the sampler.
i.e. launching parallel workers if necessary.
This method is deprecated, please launch workers in construct instead.
-
run_worker
(factory, to_worker, to_sampler, worker_number, agent, env)[source]¶ Run the streaming worker state machine.
Starts in the “not streaming” state. Enters the “streaming” state when the “start” or “continue” message is received. While in the “streaming” state, it streams episodes back to the parent process. When it receives a “stop” message, or the queue back to the parent process is full, it enters the “not streaming” state. When it receives the “exit” message, it terminates.
Critically, the worker never blocks on sending messages back to the sampler, to ensure it remains responsive to messages.
- Parameters
factory (WorkerFactory) – Pickleable factory for creating workers. Should be transmitted to other processes / nodes where work needs to be done, then workers should be constructed there.
to_worker (multiprocessing.Queue) – Queue to send commands to the worker.
to_sampler (multiprocessing.Queue) – Queue to send episodes back to the sampler.
worker_number (int) – Number of this worker.
agent (Policy) – Agent to use to sample episodes. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.
env (Environment) – Environment from which episodes are sampled. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.
- Raises
AssertionError – On internal errors.