Source code for commonpower.control.runners

"""
Runners to manage training/deployment in systems with both RL and non-RL controllers.
"""
from __future__ import annotations

import os
import random
import time
import warnings
from collections import OrderedDict, deque
from datetime import datetime, timedelta
from itertools import chain
from typing import List, Tuple, Union

import gymnasium as gym
import numpy as np
import torch
from pyomo.opt import TerminationCondition
from pyomo.opt.solver import OptSolver
from stable_baselines3 import PPO, SAC
from stable_baselines3.common.base_class import BasePolicy
from stable_baselines3.common.utils import safe_mean
from tqdm import tqdm

import wandb
from commonpower.control.configs.algorithms import MAPPOBaseConfig, SB3MetaConfig
from commonpower.control.controllers import OptimalController, RLBaseController
from commonpower.control.environments import ControlEnv
from commonpower.control.logging_utils.loggers import BaseLogger, TensorboardLogger
from commonpower.control.util import t2n
from commonpower.control.wrappers import DeploymentWrapper
from commonpower.core import System
from commonpower.modeling.history import ModelHistory
from commonpower.utils.cp_exceptions import InstanceError
from commonpower.utils.default_solver import get_default_solver


[docs] class BaseRunner: def __init__( self, sys: System, global_controller: OptimalController = None, horizon: timedelta = timedelta(hours=24), dt: timedelta = timedelta(minutes=60), continuous_control: bool = False, history: ModelHistory = None, solver: OptSolver = get_default_solver(), seed: int = None, normalize_actions: bool = True, ): """ Base class for any runner for power system control with one or multiple agents. Initializes the system and its controllers. Can be used for training one or multiple reinforcement learning (RL) agents or for deploying agents. Subclasses mainly have to implement the 'run()' method. Args: sys (System): power system to be controlled global_controller (OptimalController): instance of controller taking over control of all nodes that have not yet been assigned a controller. Mostly used to balance the system using a market node or a generator. Defaults to OptimalController("global"). horizon (timedelta): amount of time that the controller looks into the future dt (timedelta): control time interval continuous_control (bool): whether to use an infinite control horizon history (ModelHistory): logger solver (OptSolver): solver for optimization problem seed (int): seed for the global random number generator of numpy (we use np.random.seed(seed) instead of instantiating our own generator) normalize_actions (bool): whether or not to normalize the action space Returns: BaseRunner """ self.sys = sys self.controllers = None self.rl_controllers = None self.model_inst = None self.env = None self.solver = solver # time handling self.start_time = None self.horizon = horizon self.episode_length = None self.dt = dt self.continuous_control = continuous_control # controller to balance system self.global_controller = global_controller or OptimalController("global") self.global_controller = self.global_controller.add_system(self.sys) # model history for logging self.history = history or ModelHistory([sys]) # seed for global random number generator if seed is not None: self.seed = seed else: # warnings.warn("No seed given. Selecting a random seed.") self.seed = random.randint(1, 100) # normalizing actions of controllers (just matters for RL controllers) self.normalize_actions = normalize_actions
[docs] def run(self, n_steps: int = 24, fixed_start: datetime = None) -> None: """ Simulates the scenario for a given number of time steps. Args: n_steps (int): number of steps to run fixed_start (datetime): whether to run from a fixed given start timestamp """ self.fixed_start = fixed_start self._run(n_steps)
def _run(self, n_steps: int = 24) -> None: raise NotImplementedError
[docs] def prepare_run(self): """ Prepare the training or deployment by initializing the system and its controllers. Assigns a global controller that takes over control of all entities which require inputs and have not been assigned a controller by the system's set-up. Returns: None """ # initialize system eps_horizon = timedelta(hours=0) if self.episode_length is None else self.dt * self.episode_length self.sys.initialize( horizon=self.horizon, episode_horizon=eps_horizon, tau=self.dt, continuous_control=self.continuous_control, solver=self.solver, ) # test whether system set-up is feasible self.system_feasible() # get controllers self.controllers = self.sys.get_controllers() self.rl_controllers = self.sys.get_controllers(ctrl_types=[RLBaseController]) # seeding the global random number generator of the random & numpy module (will mainly be used for initializers) random.seed(self.seed) np.random.seed(self.seed) torch.manual_seed(self.seed) torch.cuda.manual_seed_all(self.seed) if self.start_time is None: self.start_time = self.sys.sample_start_date(fixed_start=self.fixed_start) self.sys.reset(self.start_time) self.model_inst = self.sys.instance
[docs] def finish_run(self): """ Terminates run. Returns: None """ # remove dummy controller from system self.sys.controllers.pop(self.global_controller.name, None) self.global_controller.detach()
[docs] def set_start_time(self, start_time: datetime): """ Set start time from external. Args: start_time (datetime): date and hour at which to reset the system before starting a run. Returns: None """ self.start_time = start_time
[docs] def system_feasible(self, n_checks: int = 1): """ Check whether the current system set-up is feasible. Args: n_checks (int): number of feasibility checks to run Returns: None """ for i in range(n_checks): start_time = self.sys.sample_start_date(fixed_start=None) self.sys.reset(start_time) inst = self.sys.instance results = self.solver.solve(inst) if results.solver.termination_condition in [ TerminationCondition.infeasible, TerminationCondition.unbounded, TerminationCondition.infeasibleOrUnbounded, ]: raise InstanceError( inst, "Solving the model is infeasible or unbounded, please consider your system set-up" )
[docs] class BaseTrainer(BaseRunner): def __init__( self, sys: System, global_controller: OptimalController = OptimalController("global"), wrapper: gym.Wrapper = None, horizon: timedelta = timedelta(hours=24), episode_length: int = 24, dt: timedelta = timedelta(minutes=60), continuous_control: bool = False, history: ModelHistory = None, solver: OptSolver = get_default_solver(), save_path: str = "./saved_models/test_model", seed: int = None, normalize_actions: bool = True, limited_date_range: List[datetime] = None, ): """ Base class for any runner used for training one or multiple reinforcement learning (RL) agents. Args: sys (System): power system to be controlled global_controller (OptimalController): instance of controller taking over control of all nodes that have not yet been assigned a controller. Mostly used to balance the system using a market node or a generator. Defaults to OptimalController("global"). wrapper (gym.Wrapper): wrapper for the environment that handles the RL agents during training (used for example for single-agent RL control). horizon (timedelta): amount of time that the controller looks into the future episode_length (int): number of time steps to simulate before the system is reset during RL training if continuous_control=False dt (timedelta): control time interval continuous_control (bool): whether to use an infinite control horizon history (ModelHistory): logger solver (OptSolver): solver for optimization problem save_path (str): local path to folder in which the trained policy will be stored (as .zip file) after the training is finished seed (int): seed for the global random number generator of numpy (we use np.random.seed(seed) instead of instantiating our own generator) normalize_actions (bool): whether or not to normalize the action space limited_date_range (list): limits the system's date range such that we only train over a specific interval Returns: BaseTrainer """ super().__init__( sys=sys, global_controller=global_controller, horizon=horizon, dt=dt, continuous_control=continuous_control, history=history, solver=solver, seed=seed, normalize_actions=normalize_actions, ) self.limited_date_range = limited_date_range # environment wrapper function self.wrapper = wrapper # model save path self.save_path = save_path # episode length for learning self.episode_length = episode_length
[docs] def prepare_run(self): """ In addition to the preparation in BaseRunner, we also instantiate an environment function as an API for the RL training. Returns: None """ super().prepare_run() # limit date range of system to only start training after changes if self.limited_date_range is not None: self.sys.limit_date_range(start=self.limited_date_range[0], end=self.limited_date_range[1]) # create environment function according to gymnasium API if len(list(self.sys.get_controllers(ctrl_types=[RLBaseController]))) >= 1: self.env = self.sys.create_env_func( episode_length=self.episode_length, wrapper=self.wrapper, fixed_start=self.fixed_start, normalize_actions=self.normalize_actions, )
[docs] class SingleAgentTrainer(BaseTrainer): def __init__( self, sys: System, alg_config: SB3MetaConfig, global_controller: OptimalController = OptimalController("global"), policy: BasePolicy = None, wrapper: gym.Wrapper = None, logger: BaseLogger = None, horizon: timedelta = timedelta(hours=24), episode_length: int = 24, dt: timedelta = timedelta(minutes=60), continuous_control: bool = False, history: ModelHistory = None, solver: OptSolver = get_default_solver(), save_path: str = "./saved_models/test_model", seed: int = None, normalize_actions: bool = True, limited_date_range: List[datetime] = None, ): """ Runner for training a single RL agent (with algorithms from the StableBaselines 3 repository). Args: sys (System): power system to be controlled global_controller (OptimalController): instance of controller taking over control of all nodes that have not yet been assigned a controller. Mostly used to balance the system using a market node or a generator. Defaults to OptimalController("global"). alg_config (SB3MetaConfig): configuration for the RL algorithm and policy to be trained policy (BasePolicy): policy instance (can be handed over to be retrained) wrapper (gym.Wrapper): wrapper for the environment that handles the RL agents during training (used for example for single-agent RL control). logger (BaseLogger): object for handling training logs horizon (timedelta): amount of time that the controller looks into the future episode_length (int): number of time steps to simulate before the system is reset during RL training if continuous_control=False dt (timedelta): control time interval continuous_control (bool): whether to use an infinite control horizon history (ModelHistory): logger solver (OptSolver): solver for optimization problem save_path (str): local path to folder in which the trained policy will be stored (as .zip file) after the training is finished seed (int): seed for the global random number generator of numpy (we use np.random.seed(seed) instead of instantiating our own generator) normalize_actions (bool): whether or not to normalize the action space limited_date_range (list): limits the system's date range such that we only train over a specific interval Returns: SingleAgentTrainer """ super().__init__( sys=sys, global_controller=global_controller, wrapper=wrapper, horizon=horizon, episode_length=episode_length, dt=dt, continuous_control=continuous_control, history=history, solver=solver, save_path=save_path, seed=seed, normalize_actions=normalize_actions, limited_date_range=limited_date_range, ) self.alg_config = alg_config self.policy = policy if logger is None: warnings.warn("No logger specified. Writing tensorboard log files to 'default_log/' directory.") self.logger = TensorboardLogger(log_dir="./default_log/") else: self.logger = logger
[docs] def _run(self, n_steps: int = 24): """ Runs the single-agent RL training algorithm for a given number of time steps and saves the trained policy. Returns: None """ self.prepare_run() training_steps = self.alg_config.total_steps # Define logging interval based on config of the algorithm if self.alg_config.algorithm == PPO: log_int = int(self.alg_config.algorithm_config.n_steps / self.episode_length) elif self.alg_config.algorithm == SAC: log_int = int(self.episode_length / self.alg_config.algorithm_config.train_freq) else: log_int = 1 print("Warning: Logging interval not defined for this algorithm. Logging after each training step.") if log_int < 1: log_int = 1 print("Warning: Logging interval was infeasible. Logging after each training step.") self.policy.learn(total_timesteps=training_steps, callback=self.logger.log_function(), log_interval=log_int) # store reference to model in controller for ctrl in self.sys.get_controllers(ctrl_types=[RLBaseController]).values(): ctrl.save(self.policy, save_path=self.save_path) self.finish_run()
[docs] def prepare_run(self): """ Prepare the training by initializing the system and its controllers. Assigns a global controller that takes over control of all entities which require inputs and have not been assigned a controller by the system's set-up. Sets an initial policy if no pre-trained policy was handed over at instantiation. Returns: None """ super().prepare_run() TrainAlg = self.alg_config.algorithm if not self.policy: self.policy = TrainAlg( env=self.env, tensorboard_log=self.logger.get_log_dir(), seed=self.seed, verbose=2, **self.alg_config.algorithm_config.model_dump(), # convert pydantic Model to dictionary )
[docs] def finish_run(self): super().finish_run() self.logger.finish_logging()
[docs] class DeploymentRunner(BaseRunner): def __init__( self, sys: System, global_controller: OptimalController = None, alg_config: Union[SB3MetaConfig, MAPPOBaseConfig] = None, wrapper: gym.Wrapper = None, horizon: timedelta = timedelta(hours=24), dt: timedelta = timedelta(minutes=60), continuous_control: bool = True, history: ModelHistory = None, solver: OptSolver = get_default_solver(), seed: int = None, normalize_actions: bool = True, ): """ Runner for the deployment of multiple heterogeneous controllers (RL, optimal control). Args: sys (System): power system to be controlled global_controller (OptimalController): instance of controller taking over control of all nodes that have not yet been assigned a controller. Mostly used to balance the system using a market node or a generator. Defaults to OptimalController("global"). alg_config (Union[SB3MetaConfig, MAPPOBaseConfig]): configuration for the RL algorithm and policy to be trained wrapper (gym.Wrapper): wrapper for the environment that handles the RL agents during training (used for example for single-agent RL control). horizon (timedelta): amount of time that the controller looks into the future dt (timedelta): control time interval continuous_control (bool): whether to use an infinite control horizon history (ModelHistory): logging solver (OptSolver): solver for optimization problem seed (int): seed for the global random number generator of numpy (we use np.random.seed(seed) instead of instantiating our own generator) normalize_actions (bool): whether or not to normalize the action space Returns: DeploymentRunner """ super().__init__( sys=sys, global_controller=global_controller, horizon=horizon, dt=dt, continuous_control=continuous_control, history=history, solver=solver, seed=seed, normalize_actions=normalize_actions, ) self.alg_config = alg_config self.wrapper = wrapper
[docs] def _run(self, n_steps: int = 24): """ Runs the deployment of multiple heterogeneous controllers for a given number of time steps. Args: n_steps (int): number of time steps to run the system for Returns: None """ self.prepare_run() # run obs, _ = self.env.reset() for step in tqdm(range(n_steps)): if self.rl_controllers: # we loop through all RL controllers to compute their actions given the current state. The union of all # actions will then be passed on to the Gym environment in the required format. rl_actions = OrderedDict() for ctrl_id, rl_ctrl in self.rl_controllers.items(): ctrl_obs = obs[ctrl_id] rl_actions[ctrl_id], _ = rl_ctrl.compute_control_input(obs=ctrl_obs, input_callback=None) else: # only optimal controllers --> actions will be computed in step() function of System rl_actions = None obs, reward, terminated, truncated, info = self.env.step(action=rl_actions) if step == n_steps - 1: # terminal step # we essentially only update the model history here self.sys.terminal_step(self.history) if terminated or truncated: if self.rl_controllers: obs, _ = self.env.reset() self.finish_run()
[docs] def prepare_run(self): """ Prepare the deployment by initializing the system and its controllers. Assigns a global controller that takes over control of all entities which require inputs and have not been assigned a controller by the system's set-up. Sets the operation mode of all RL controllers within the system to 'deployment'. Returns: """ super().prepare_run() # ToDo: more elegant way to solve this? if self.start_time is not None: self.fixed_start = self.start_time # We have to wrap the environment with a DeploymentWrapper to ensure compatibility self.env = DeploymentWrapper( self.sys.create_env_func( wrapper=self.wrapper, fixed_start=self.fixed_start, normalize_actions=self.normalize_actions, history=self.history, ) ) # set train flag of environment to False self.env.unwrapped.set_mode("deploy") # set train flag of RL runners to False for rl_ctrl in self.rl_controllers.values(): rl_ctrl.set_mode("deploy") # load RL policies self.alg_config.seed = self.seed # need to hand over the seed to re-load the policy if not rl_ctrl.policy: rl_ctrl.load(env=self.env, config=self.alg_config)
[docs] class MAPPOTrainer(BaseTrainer): def __init__( self, sys: System, alg_config: MAPPOBaseConfig, global_controller: OptimalController = OptimalController("global"), wrapper: gym.Wrapper = None, logger: BaseLogger = None, horizon: timedelta = timedelta(hours=24), episode_length: int = 24, dt: timedelta = timedelta(minutes=60), continuous_control: bool = False, history: ModelHistory = None, solver: OptSolver = get_default_solver(), save_path: str = "./saved_models/test_model", seed: int = None, normalize_actions: bool = True, limited_date_range: List[datetime] = None, ): """ Runner for training multiple heterogeneous agents with MAPPO/IPPO from the on-policy repository (https://github.com/marlbenchmark/on-policy/tree/main/onpolicy). Based on our BaseTrainer and our logging framework as well as the BaseRunner from the on-policy repository Args: sys (System): power system to be controlled global_controller (OptimalController): instance of controller taking over control of all nodes that have not yet been assigned a controller. Mostly used to balance the system using a market node or a generator. Defaults to OptimalController("global"). alg_config (MAPPOBaseConfig): configuration for the RL algorithm and policy to be trained wrapper (gym.Wrapper): wrapper for the environment that handles the RL agents during training (used for example for single-agent RL control). logger (BaseLogger): object for handling training logs horizon (timedelta): amount of time that the controller looks into the future episode_length (int): number of time steps to simulate before the system is reset during RL training if continuous_control=False dt (timedelta): control time interval continuous_control (bool): whether to use an infinite control horizon history (ModelHistory): logging solver (OptSolver): solver for optimization problem save_path (str): local path to folder in which the trained policy will be stored (as .zip file) after the training is finished seed (int): seed for the global random number generator of numpy (we use np.random.seed(seed) instead of instantiating our own generator) normalize_actions (bool): whether or not to normalize the action space limited_date_range (list): limits the system's date range such that we only train over a specific interval """ super().__init__( sys=sys, global_controller=global_controller, wrapper=wrapper, horizon=horizon, episode_length=episode_length, dt=dt, continuous_control=continuous_control, history=history, solver=solver, save_path=save_path, seed=seed, normalize_actions=normalize_actions, limited_date_range=limited_date_range, ) # logging self.logger = logger self.callback = logger.get_callback() self.log_function = logger.get_log_function() all_args = alg_config self.all_args = all_args # set device self._set_device() # check other arguments according to algorithm: self._check_alg_config() # parse arguments self._parse_alg_config() # IPPO/MAPPO-specific attributes self.num_agents = None # MAPPO enables training with multiple envs in parallel, which we currently do not use, # but still keep to preserve compatibility self.envs = None self.eval_envs = None # MAPPO enables evaluating training regulary on separate environments - also not used atm self.policy = [] # list of policies for each agent self.trainer = [] # list of training algorithm objects for each agent self.buffer = [] # list of buffers for each agent self.ep_info_buffer = [] # list of buffers for each agent which contain infos for each episode
[docs] def prepare_run(self): # import these here such that no errors will be thrown in case someone does not have the on-policy repo # installed from onpolicy.algorithms.r_mappo.algorithm.rMAPPOPolicy import R_MAPPOPolicy as Policy from onpolicy.algorithms.r_mappo.r_mappo import R_MAPPO as TrainAlgo from onpolicy.envs.env_wrappers import DummyVecEnv, SubprocVecEnv from onpolicy.utils.separated_buffer import SeparatedReplayBuffer """ Prepare the training or deployment by initializing the system and its controllers. Assigns a global controller that takes over control of all entities which require inputs and have not been assigned a controller by the system's set-up. Takes care of initializing the training environment such that it is compatible with MAPPO/IPPO algorithm Returns: None """ # initialize system self.sys.initialize( horizon=self.horizon, episode_horizon=self.dt * self.episode_length, tau=self.dt, continuous_control=self.continuous_control, ) # test whether system set-up is feasible self.system_feasible() # limit date range of system to only start training after changes if self.limited_date_range is not None: self.sys.limit_date_range(start=self.limited_date_range[0], end=self.limited_date_range[1]) # get controllers self.controllers = self.sys.get_controllers() self.rl_controllers = self.sys.get_controllers(ctrl_types=[RLBaseController]) self.num_agents = len(self.rl_controllers) # make directories for saving the trained models for agent_id in range(self.num_agents): os.makedirs(self.save_path + "/agent" + str(agent_id), exist_ok=True) # system reset if self.start_time is None: self.start_time = self.sys.sample_start_date(fixed_start=self.fixed_start) self.sys.reset(self.start_time) self.model_inst = self.sys.instance # seeding the global random number generator of the torch module (mainly used for weight initialization) # and random & numpy module (will mainly be used for initializers) random.seed(self.seed) np.random.seed(self.seed) torch.manual_seed(self.seed) torch.cuda.manual_seed_all(self.seed) # create environment, including vectorization wrappers def make_vec_env(all_args, system): def get_env_fn(): def init_env(): env = ControlEnv(system=system) if self.wrapper: env = self.wrapper(env) return env return init_env if all_args.n_rollout_threads == 1: return DummyVecEnv([get_env_fn()]) else: return SubprocVecEnv([get_env_fn() for i in range(all_args.n_rollout_threads)]) self.envs = make_vec_env(all_args=self.all_args, system=self.sys) # used to set self._seeds, which will then be used at reset to actually seed the envs self.envs.seed(self.seed) self.eval_envs = make_vec_env(all_args=self.all_args, system=self.sys) self.eval_envs.seed(self.seed) # set up the policies (actor and critic networks) of all agents for agent_id in range(self.num_agents): share_observation_space = ( self.envs.share_observation_space[agent_id] if self.use_centralized_V else self.envs.observation_space[agent_id] ) # policy network po = Policy( self.all_args, self.envs.observation_space[agent_id], share_observation_space, self.envs.action_space[agent_id], device=self.device, ) self.policy.append(po) # set up training algorithm instances and replay buffers for agent_id in range(self.num_agents): # algorithm tr = TrainAlgo(self.all_args, self.policy[agent_id], device=self.device) # buffer share_observation_space = ( self.envs.share_observation_space[agent_id] if self.use_centralized_V else self.envs.observation_space[agent_id] ) bu = SeparatedReplayBuffer( self.all_args, self.envs.observation_space[agent_id], share_observation_space, self.envs.action_space[agent_id], ) ep_info_bu = deque(maxlen=100) self.buffer.append(bu) self.ep_info_buffer.append(ep_info_bu) self.trainer.append(tr) self.warmup() # initialize callback self.callback.init_callback(runner=self)
[docs] def _run(self, n_steps: int = 24): """ Runs the multi-agent RL training algorithm (MAPPO or IPPO) for a given number of time steps and saves the trained policies. Returns: None """ self.prepare_run() start = time.time() episodes = int(self.num_env_steps) // self.episode_length // self.n_rollout_threads print(f"Total episodes: {episodes}") # start logging self.callback.on_training_start(locals(), globals()) # in each episode: collect rollouts and update policies based on them for episode in range(episodes): if self.use_linear_lr_decay: for agent_id in range(self.num_agents): self.trainer[agent_id].policy.lr_decay(episode, episodes) # logging for episode self.callback.on_rollout_start() for step in range(self.episode_length): # Sample actions values, actions, action_log_probs, rnn_states, rnn_states_critic, actions_env = self.collect(step) # Observe reward and next obs # The on-policy training algorithms are based on the deprecated OpenAI gym environment API. # The step() function of the Gymnasium API returns additional information, # the "truncated" variable. We don't need it here, which is why we filter it out in the DummyVecEnv obs, rewards, dones, infos = self.envs.step(actions_env) data = obs, rewards, dones, infos, values, actions, action_log_probs, rnn_states, rnn_states_critic # insert data into buffer self.insert(data) # logging self.callback.update_locals(locals()) num_timesteps = step * self.n_rollout_threads continue_training = self.callback.on_step(num_timesteps) # option to abort early, not used at the moment if not continue_training: break # compute return and update network self.compute() # update policies based on data that is currently in the buffer train_infos = self.train() # post process total_num_steps = (episode + 1) * self.episode_length * self.n_rollout_threads # log information if episode % self.log_interval == 0: end = time.time() for agent_id in range(self.num_agents): # idv_rews = [] # ToDo: How to get individual rewards? Do we need this? # for info in infos: # for count, info in enumerate(infos): # if 'individual_reward' in infos[count][agent_id].keys(): # idv_rews.append(infos[count][agent_id].get('individual_reward', 0)) # train_infos[agent_id].update({'individual_rewards': np.mean(idv_rews)}) self.ep_info_buffer[agent_id].append( np.mean(self.buffer[agent_id].rewards) * int(self.horizon / self.dt) ) train_infos[agent_id].update({"average_episode_rewards": safe_mean(self.ep_info_buffer[agent_id])}) self.log_train(train_infos, total_num_steps, start, end) # invoke callback to log additional information self.callback.on_rollout_end() # this makes sure the logged information is written to the different output formats # (e.g., stdout, tensorboard) self.log_function.dump(total_num_steps) # eval if episode % self.eval_interval == 0 and self.use_eval: self.eval(total_num_steps) # save the models self.save() # end logging self.callback.on_training_end() self.finish_run()
[docs] def warmup(self): """ Pre-training preparations specific to MAPPO/IPPO Returns: None """ # Seeding is considered within DummyVecEnv/SubProcVecEnv through seeds initialized previously # by calling self.envs.seed(seed=self.seed) obs, _ = self.envs.reset() share_obs = [] for o in obs: share_obs.append(list(chain(*o))) share_obs = np.array(share_obs) for agent_id in range(self.num_agents): if not self.use_centralized_V: share_obs = np.array(list(obs[:, agent_id])) self.buffer[agent_id].share_obs[0] = share_obs[0].copy() self.buffer[agent_id].obs[0] = np.array(list(obs[:, agent_id])).copy()
[docs] @torch.no_grad() def collect(self, step: int) -> Tuple[np.array, List[np.array], List[np.array], np.array, np.array, List[np.array]]: """ Obtain actions for the current step based on current policies, observations, shared observations, and hidden states. The masks are not necessary in our case, because all agents terminate at the same time. Args: step (int): The current step within the episode Returns: Tuple: tuple containing: - values (np.array) - actions (List[np.array]) - action probabilities, logarithmic (List[np.array]) - hidden states of recurrent NN actor. Only needed for recurrent policies (np.array) - hidden states of recurrent NN critic. Only needed for recurrent policies (np.array) - environment actions ? not sure, adapted from on-policy BaseRunner (List[np.array]) """ values = [] actions = [] action_log_probs = [] rnn_states = [] rnn_states_critic = [] for agent_id in range(self.num_agents): self.trainer[agent_id].prep_rollout() value, action, action_log_prob, rnn_state, rnn_state_critic = self.trainer[agent_id].policy.get_actions( self.buffer[agent_id].share_obs[step], self.buffer[agent_id].obs[step], self.buffer[agent_id].rnn_states[step], self.buffer[agent_id].rnn_states_critic[step], self.buffer[agent_id].masks[step], ) values.append(t2n(value)) action = t2n(action) actions.append(action) action_log_probs.append(t2n(action_log_prob)) rnn_states.append(t2n(rnn_state)) rnn_states_critic.append(t2n(rnn_state_critic)) actions_env = actions values = np.array(values).transpose(1, 0, 2) # actions = np.array(actions).transpose(1, 0, 2) # action_log_probs = np.array(action_log_probs).transpose(1, 0, 2) rnn_states = np.array(rnn_states).transpose(1, 0, 2, 3) rnn_states_critic = np.array(rnn_states_critic).transpose(1, 0, 2, 3) return values, actions, action_log_probs, rnn_states, rnn_states_critic, actions_env
[docs] def insert( self, data: Tuple[ np.array, np.array, np.array, np.array, np.array, List[np.array], List[np.array], np.array, np.array, ], ) -> None: """ Write information collected during rollout to buffers (one per agent) in the appropriate format (the "SeparatedReplayBuffer" from the on-policy repository logs some information we do not require, like masks for terminated agents). Args: data (Tuple): data collected during rollout which should be inserted into buffers Returns: None """ obs, rewards, dones, infos, values, actions, action_log_probs, rnn_states, rnn_states_critic = data # Our ControlEnv and MAWrapper return one terminated/dones variable per environment and not one per agent, # so we reset all rnn_states if this "dones" is true if dones: for i, rnn_state in enumerate(rnn_states): rnn_states[i] = np.zeros((1, self.recurrent_N, self.hidden_size), dtype=np.float32) rnn_states[i] = np.zeros((1, self.recurrent_N, self.hidden_size), dtype=np.float32) masks = np.ones((self.n_rollout_threads, self.num_agents, 1), dtype=np.float32) # Problem: dones in our env only says if the env is terminated or not, not if a specific agent is done # original: masks[np.array([dones]) is True] = \ # np.zeros(((np.array([dones]) is True).sum(), 1), dtype=np.float32) share_obs = [] for o in obs: share_obs.append(list(chain(*o))) share_obs = np.array(share_obs) for agent_id in range(self.num_agents): if not self.use_centralized_V: share_obs = np.array(list(obs[:, agent_id])) self.buffer[agent_id].insert( share_obs, np.array(list(obs[:, agent_id])), rnn_states[:, agent_id], rnn_states_critic[:, agent_id], actions[agent_id], action_log_probs[agent_id], values[:, agent_id], rewards[:, agent_id], masks[:, agent_id], )
[docs] @torch.no_grad() def eval(self, total_num_steps: int): """ Evaluates current policies on separate eval environment (not used atm). Args: total_num_steps (int): Current training progress Returns: None """ eval_episode_rewards = [] eval_obs, _ = self.eval_envs.reset() eval_rnn_states = np.zeros( (self.n_eval_rollout_threads, self.num_agents, self.recurrent_N, self.hidden_size), dtype=np.float32 ) eval_masks = np.ones((self.n_eval_rollout_threads, self.num_agents, 1), dtype=np.float32) for eval_step in range(self.episode_length): eval_temp_actions_env = [] for agent_id in range(self.num_agents): self.trainer[agent_id].prep_rollout() eval_action, eval_rnn_state = self.trainer[agent_id].policy.act( np.array(list(eval_obs[:, agent_id])), eval_rnn_states[:, agent_id], eval_masks[:, agent_id], deterministic=True, ) eval_action = eval_action.detach().cpu().numpy() # rearrange action if self.eval_envs.action_space[agent_id].__class__.__name__ == "MultiDiscrete": for i in range(self.eval_envs.action_space[agent_id].shape): eval_uc_action_env = np.eye(self.eval_envs.action_space[agent_id].high[i] + 1)[ eval_action[:, i] ] if i == 0: eval_action_env = eval_uc_action_env else: eval_action_env = np.concatenate((eval_action_env, eval_uc_action_env), axis=1) elif self.eval_envs.action_space[agent_id].__class__.__name__ == "Discrete": eval_action_env = np.squeeze(np.eye(self.eval_envs.action_space[agent_id].n)[eval_action], 1) else: raise NotImplementedError eval_temp_actions_env.append(eval_action_env) eval_rnn_states[:, agent_id] = t2n(eval_rnn_state) # [envs, agents, dim] eval_actions_env = [] for i in range(self.n_eval_rollout_threads): eval_one_hot_action_env = [] for eval_temp_action_env in eval_temp_actions_env: eval_one_hot_action_env.append(eval_temp_action_env[i]) eval_actions_env.append(eval_one_hot_action_env) # Observe reward and next obs eval_obs, eval_rewards, eval_dones, eval_infos = self.eval_envs.step(eval_actions_env) eval_episode_rewards.append(eval_rewards) eval_rnn_states[eval_dones is True] = np.zeros( ((eval_dones is True).sum(), self.recurrent_N, self.hidden_size), dtype=np.float32 ) eval_masks = np.ones((self.n_eval_rollout_threads, self.num_agents, 1), dtype=np.float32) eval_masks[eval_dones is True] = np.zeros(((eval_dones is True).sum(), 1), dtype=np.float32) eval_episode_rewards = np.array(eval_episode_rewards) eval_train_infos = [] for agent_id in range(self.num_agents): eval_average_episode_rewards = np.mean(np.sum(eval_episode_rewards[:, :, agent_id], axis=0)) eval_train_infos.append({"eval_average_episode_rewards": eval_average_episode_rewards}) print("eval average episode rewards of agent%i: " % agent_id + str(eval_average_episode_rewards)) self.log_train(eval_train_infos, total_num_steps)
@torch.no_grad() def render(self): # not used pass
[docs] def compute(self): """ Compute returns based on next value (will be needed for loss) Returns: None """ for agent_id in range(self.num_agents): self.trainer[agent_id].prep_rollout() next_value = self.trainer[agent_id].policy.get_values( self.buffer[agent_id].share_obs[-1], self.buffer[agent_id].rnn_states_critic[-1], self.buffer[agent_id].masks[-1], ) next_value = t2n(next_value) self.buffer[agent_id].compute_returns(next_value, self.trainer[agent_id].value_normalizer)
[docs] def train(self) -> List[dict]: """ Perform updates of actor and critic parameters for each agent Returns: List[dict]: list of training metrics dictionary (one list entry per agent) """ train_infos = [] for agent_id in range(self.num_agents): self.trainer[agent_id].prep_training() train_info = self.trainer[agent_id].train(self.buffer[agent_id]) train_infos.append(train_info) self.buffer[agent_id].after_update() return train_infos
[docs] def save(self): """ Save the actor and critic parameters for each agent Returns: None """ for agent_id, agent in enumerate(self.rl_controllers.values()): policy_actor = self.trainer[agent_id].policy.actor actor_save_path = str(self.save_path) + "/agent" + str(agent_id) + "/actor_agent" + ".pt" agent.save(policy_actor, actor_save_path) policy_critic = self.trainer[agent_id].policy.critic critic_save_path = str(self.save_path) + "/agent" + str(agent_id) + "/critic_agent" + ".pt" agent.save(policy_critic, critic_save_path) if self.trainer[agent_id]._use_valuenorm: policy_vnorm = self.trainer[agent_id].value_normalizer vnorm_save_path = str(self.save_path) + "/agent" + str(agent_id) + "/vnrom_agent" + ".pt" agent.save(policy_vnorm, vnorm_save_path)
[docs] def log_train(self, train_infos: List[dict], total_num_steps: int, start: time.time = None, end: time.time = None): """ Args: train_infos (List[dict]): training metrics for each agent total_num_steps (int): current training progress start (time.time): start time of training episode end (time.time): end time of training episode Returns: None """ self.log_function.record("time/total_timesteps", total_num_steps) if start: self.log_function.record("time/fps", int(total_num_steps / (end - start))) for agent_id in range(self.num_agents): for k, v in train_infos[agent_id].items(): agent_k = "agent%i/" % agent_id + k if isinstance(v, torch.Tensor): self.log_function.record(agent_k, v.item()) else: self.log_function.record(agent_k, v)
[docs] def finish_run(self): """ Finish run, mostly needed for deleting global controller and terminating Weights&Biases logger Returns: None """ super().finish_run() self.logger.finish_logging()
[docs] def _set_device(self): """ Set computing device according to algorithm configuration Returns: None """ if self.all_args.cuda and torch.backends.cuda.is_available(): print("choose to use gpu...") self.device = torch.device("cuda") torch.set_num_threads(self.all_args.n_training_threads) if self.all_args.cuda_deterministic: torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True else: print("choose to use cpu...") self.device = torch.device("cpu") torch.set_num_threads(self.all_args.n_training_threads)
[docs] def _check_alg_config(self): """ Sanity check for the algorithm configuration: If we use any variant of MAPPO, we want a shared observation space which means that use_centralized_V has to be true. If we use a recurrent policy (RMAPPO), the respective arguments have to be true. Returns: None """ # we do not support training with vectorized environments atm if self.all_args.n_rollout_threads > 1: raise ValueError( "Parameter 'n_rollout_threads' has to equal '1' as we do not yet support training with " "multiple environments simultaneously!" ) if self.all_args.algorithm_name == "rmappo": print("You are choosing to use RMAPPO, we set use_recurrent_policy to be True") self.all_args.use_recurrent_policy = True self.all_args.use_naive_recurrent_policy = False self.all_args.use_centralized_V = True elif self.all_args.algorithm_name == "mappo": print("You are choosing to use MAPPO, we set use_recurrent_policy & use_naive_recurrent_policy to be False") self.all_args.use_recurrent_policy = False self.all_args.use_naive_recurrent_policy = False self.all_args.use_centralized_V = True elif self.all_args.algorithm_name == "ippo": print("You are choosing to use IPPO, we set use_centralized_V to be False") self.all_args.use_centralized_V = False else: raise NotImplementedError
[docs] def _parse_alg_config(self): """ Write algorithm configuration to class attributes Returns: None """ # parameters self.algorithm_name = self.all_args.algorithm_name self.use_centralized_V = self.all_args.use_centralized_V self.num_env_steps = self.all_args.num_env_steps self.episode_length = self.all_args.episode_length self.n_rollout_threads = self.all_args.n_rollout_threads self.n_eval_rollout_threads = self.all_args.n_eval_rollout_threads self.use_linear_lr_decay = self.all_args.use_linear_lr_decay self.hidden_size = self.all_args.hidden_size self.recurrent_N = self.all_args.recurrent_N # interval self.use_eval = self.all_args.use_eval self.eval_interval = self.all_args.eval_interval self.log_interval = self.all_args.log_interval if wandb.run: self.save_path = str(self.callback.model_save_path) else: if not os.path.exists(self.save_path): os.makedirs(self.save_path)