Source code for garage.sampler.worker_factory

"""Worker factory used by Samplers to construct Workers."""
import psutil

from garage.experiment.deterministic import get_seed
from garage.sampler.default_worker import DefaultWorker


[docs]def identity_function(value): """Do nothing. This function exists so it can be pickled. Args: value(object): A value. Returns: object: The value. """ return value
[docs]class WorkerFactory: """Constructs workers for Samplers. The intent is that this object should be sufficient to avoid subclassing the sampler. Instead of subclassing the sampler for e.g. a specific backend, implement a specialized WorkerFactory (or specify appropriate functions to this one). Not that this object must be picklable, since it may be passed to workers. However, its fields individually need not be. All arguments to this type must be passed by keyword. Args: max_episode_length(int): The maximum length episodes which will be sampled. is_tf_worker (bool): Whether it is workers for TFTrainer. seed(int): The seed to use to initialize random number generators. n_workers(int): The number of workers to use. worker_class(type): Class of the workers. Instances should implement the Worker interface. worker_args (dict or None): Additional arguments that should be passed to the worker. """ def __init__( self, *, # Require passing by keyword. max_episode_length, is_tf_worker=False, seed=get_seed(), n_workers=psutil.cpu_count(logical=False), worker_class=DefaultWorker, worker_args=None): self.n_workers = n_workers self._seed = seed self._max_episode_length = max_episode_length if is_tf_worker: # Import here to avoid hard dependency on TF. # pylint: disable=import-outside-toplevel from garage.tf.samplers import TFWorkerClassWrapper worker_class = TFWorkerClassWrapper(worker_class) self._worker_class = worker_class if worker_args is None: self._worker_args = {} else: self._worker_args = worker_args
[docs] def prepare_worker_messages(self, objs, preprocess=identity_function): """Take an argument and canonicalize it into a list for all workers. This helper function is used to handle arguments in the sampler API which may (optionally) be lists. Specifically, these are agent, env, agent_update, and env_update. Checks that the number of parameters is correct. Args: objs(object or list): Must be either a single object or a list of length n_workers. preprocess(function): Function to call on each single object before creating the list. Raises: ValueError: If a list is passed of a length other than `n_workers`. Returns: List[object]: A list of length self.n_workers. """ if isinstance(objs, list): if len(objs) != self.n_workers: raise ValueError( 'Length of list doesn\'t match number of workers') return [preprocess(obj) for obj in objs] else: return [preprocess(objs) for _ in range(self.n_workers)]
def __call__(self, worker_number): """Construct a worker given its number. Args: worker_number(int): The worker number. Should be at least 0 and less than or equal to `n_workers`. Raises: ValueError: If the worker number is greater than `n_workers`. Returns: garage.sampler.Worker: The constructed worker. """ if worker_number >= self.n_workers: raise ValueError('Worker number is too big') return self._worker_class(worker_number=worker_number, seed=self._seed, max_episode_length=self._max_episode_length, **self._worker_args)