Source code for garage.tf.experiment.local_tf_runner

"""
The local runner for TensorFlow algorithms.

A runner setup context for algorithms during initialization and
pipelines data between sampler and algorithm during training.
"""
from dowel import logger
import tensorflow as tf

from garage.experiment import LocalRunner


[docs]class LocalTFRunner(LocalRunner): """This class implements a local runner for TensorFlow algorithms. A local runner provides a default TensorFlow session using python context. This is useful for those experiment components (e.g. policy) that require a TensorFlow session during construction. Use Runner.setup(algo, env) to setup algorithm and environement for runner and Runner.train() to start training. Args: snapshot_config (garage.experiment.SnapshotConfig): The snapshot configuration used by LocalRunner to create the snapshotter. If None, it will create one with default settings. max_cpus (int): The maximum number of parallel sampler workers. sess (tf.Session): An optional TensorFlow session. A new session will be created immediately if not provided. Note: The local runner will set up a joblib task pool of size max_cpus possibly later used by BatchSampler. If BatchSampler is not used, the processes in the pool will remain dormant. This setup is required to use TensorFlow in a multiprocess environment before a TensorFlow session is created because TensorFlow is not fork-safe. See https://github.com/tensorflow/tensorflow/issues/2448. When resume via command line, new snapshots will be saved into the SAME directory if not specified. When resume programmatically, snapshot directory should be specify manually or through run_experiment() interface. Examples: # to train with LocalTFRunner() as runner: env = gym.make('CartPole-v1') policy = CategoricalMLPPolicy( env_spec=env.spec, hidden_sizes=(32, 32)) algo = TRPO( env=env, policy=policy, baseline=baseline, max_path_length=100, discount=0.99, max_kl_step=0.01) runner.setup(algo, env) runner.train(n_epochs=100, batch_size=4000) # to resume immediately. with LocalTFRunner() as runner: runner.restore(resume_from_dir) runner.resume() # to resume with modified training arguments. with LocalTFRunner() as runner: runner.restore(resume_from_dir) runner.resume(n_epochs=20) """ def __init__(self, snapshot_config, sess=None, max_cpus=1): super().__init__(snapshot_config=snapshot_config, max_cpus=max_cpus) self.sess = sess or tf.compat.v1.Session() self.sess_entered = False def __enter__(self): """Set self.sess as the default session. Returns: This local runner. """ if tf.compat.v1.get_default_session() is not self.sess: self.sess.__enter__() self.sess_entered = True return self def __exit__(self, exc_type, exc_val, exc_tb): """Leave session.""" if tf.compat.v1.get_default_session( ) is self.sess and self.sess_entered: self.sess.__exit__(exc_type, exc_val, exc_tb) self.sess_entered = False
[docs] def setup(self, algo, env, sampler_cls=None, sampler_args=None): """Set up runner and sessions for algorithm and environment. This method saves algo and env within runner and creates a sampler, and initializes all uninitialized variables in session. Note: After setup() is called all variables in session should have been initialized. setup() respects existing values in session so policy weights can be loaded before setup(). Args: algo (garage.np.algos.RLAlgorithm): An algorithm instance. env (garage.envs.GarageEnv): An environement instance. sampler_cls (garage.sampler.Sampler): A sampler class. sampler_args (dict): Arguments to be passed to sampler constructor. """ self.initialize_tf_vars() logger.log(self.sess.graph) super().setup(algo, env, sampler_cls, sampler_args)
[docs] def initialize_tf_vars(self): """Initialize all uninitialized variables in session.""" with tf.name_scope('initialize_tf_vars'): uninited_set = [ e.decode() for e in self.sess.run( tf.compat.v1.report_uninitialized_variables()) ] self.sess.run( tf.compat.v1.variables_initializer([ v for v in tf.compat.v1.global_variables() if v.name.split(':')[0] in uninited_set ]))