garage.sampler.multiprocessing_sampler

A multiprocessing sampler which avoids waiting as much as possible.

class MultiprocessingSampler(worker_factory, agents, envs)[source]

Bases: garage.sampler.sampler.Sampler

Inheritance diagram of garage.sampler.multiprocessing_sampler.MultiprocessingSampler

Sampler that uses multiprocessing to distribute workers.

Parameters
  • worker_factory (WorkerFactory) – Pickleable factory for creating workers. Should be transmitted to other processes / nodes where work needs to be done, then workers should be constructed there.

  • agents (Policy or List[Policy]) – Agent(s) to use to sample episodes. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.

  • envs (Environment or List[Environment]) – Environment from which episodes are sampled. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.

classmethod from_worker_factory(cls, worker_factory, agents, envs)[source]

Construct this sampler.

Parameters
  • worker_factory (WorkerFactory) – Pickleable factory for creating workers. Should be transmitted to other processes / nodes where work needs to be done, then workers should be constructed there.

  • agents (Policy or List[Policy]) – Agent(s) to use to sample episodes. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.

  • envs (Environment or List[Environment]) – Environment from which episodes are sampled. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.

Returns

An instance of cls.

Return type

Sampler

obtain_samples(self, itr, num_samples, agent_update, env_update=None)[source]

Collect at least a given number transitions (timesteps).

Parameters
  • itr (int) – The current iteration number. Using this argument is deprecated.

  • num_samples (int) – Minimum number of transitions / timesteps to sample.

  • agent_update (object) – Value which will be passed into the agent_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.

  • env_update (object) – Value which will be passed into the env_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.

Returns

The batch of collected episodes.

Return type

EpisodeBatch

Raises

AssertionError – On internal errors.

obtain_exact_episodes(self, n_eps_per_worker, agent_update, env_update=None)[source]

Sample an exact number of episodes per worker.

Parameters
  • n_eps_per_worker (int) – Exact number of episodes to gather for each worker.

  • agent_update (object) – Value which will be passed into the agent_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.

  • env_update (object) – Value which will be passed into the env_update_fn before sampling episodes. If a list is passed in, it must have length exactly factory.n_workers, and will be spread across the workers.

Returns

Batch of gathered episodes. Always in worker

order. In other words, first all episodes from worker 0, then all episodes from worker 1, etc.

Return type

EpisodeBatch

Raises

AssertionError – On internal errors.

shutdown_worker(self)[source]

Shutdown the workers.

start_worker(self)

Initialize the sampler.

i.e. launching parallel workers if necessary.

This method is deprecated, please launch workers in construct instead.

run_worker(factory, to_worker, to_sampler, worker_number, agent, env)[source]

Run the streaming worker state machine.

Starts in the “not streaming” state. Enters the “streaming” state when the “start” or “continue” message is received. While in the “streaming” state, it streams episodes back to the parent process. When it receives a “stop” message, or the queue back to the parent process is full, it enters the “not streaming” state. When it receives the “exit” message, it terminates.

Critically, the worker never blocks on sending messages back to the sampler, to ensure it remains responsive to messages.

Parameters
  • factory (WorkerFactory) – Pickleable factory for creating workers. Should be transmitted to other processes / nodes where work needs to be done, then workers should be constructed there.

  • to_worker (multiprocessing.Queue) – Queue to send commands to the worker.

  • to_sampler (multiprocessing.Queue) – Queue to send episodes back to the sampler.

  • worker_number (int) – Number of this worker.

  • agent (Policy) – Agent to use to sample episodes. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.

  • env (Environment) – Environment from which episodes are sampled. If a list is passed in, it must have length exactly worker_factory.n_workers, and will be spread across the workers.

Raises

AssertionError – On internal errors.