"""
Base API (based on gymnasium API) between controlled system and RL training algorithms.
"""
from collections import OrderedDict, deque
from copy import copy, deepcopy
from datetime import datetime
from typing import Optional, Tuple, Union
import gymnasium as gym
import numpy as np
from commonpower.modeling.base import ControllableModelEntity
from commonpower.modeling.history import ModelHistory
from commonpower.utils.cp_exceptions import ControllerError
[docs]
class ControlEnv(gym.Env):
def __init__(
self,
system: ControllableModelEntity,
continuous_control: bool = False,
episode_length: int = 24,
fixed_start: datetime = None,
normalize_action_space: bool = True,
history: ModelHistory = None,
):
"""
Class that provides the interface between our power system and any reinforcement learning algorithm. Based on
the OpenAI Gym API (which is now maintained as 'gymnasium', see https://gymnasium.farama.org/). Manages all
RL controllers within the power system.
Args:
system (ControllableModelEntity): power system including Pyomo model with all constraints
continuous_control (bool): if true, the environment is never resetted
episode_length (int): how many environment interaction steps to complete before resetting the environment
fixed_start (datetime): if None, we will train from multiple random start times.
Otherwise, we will always train from the same start time.
normalize_action_space (bool): whether to normalize the action space to [-1,1]
history (ModelHistory): logger
Returns:
ControlEnv
"""
from commonpower.control.controllers import RLBaseController
self.controllers = system.get_controllers(ctrl_types=[RLBaseController])
self.sys = system
self.current_action = None
self.train_history = {}
self.episode_history = {agent_id: deque(maxlen=100) for agent_id in self.controllers.keys()}
self.normalize_actions = normalize_action_space
self.system_history = history
# training or deployment?
self.train = True
# ToDo: shared observation space?
self.observation_space = self._get_observation_space()
if self.normalize_actions:
self.action_space, self.original_action_space = self._get_normalized_action_space()
else:
self.action_space = self._get_action_space()
# whether to just continuously step through the year or not
self.continuous_control = continuous_control
# step counter
self.completed_steps = 0
self.episode_length = episode_length
# whether or not to train on a fixed day
self.fixed_start = fixed_start
def set_mode(self, mode: str):
# set train flag of RL runners to False
for ctrl in self.controllers.values():
ctrl.set_mode(mode)
if mode == "train":
self.train = True
else:
self.train = False
[docs]
def step(self, action: Union[OrderedDict, None]) -> Tuple[dict, dict, bool, bool, dict]:
"""
Advance the environment (in our case, the power system) by one step in time by applying control actions to
discrete-time dynamics and updating data sources. Handled within the System class. The actions of the RL agent
are selected within the RL training algorithm and are passed on to the power system using a callback. After the
system update, a reward is computed which indicates how good the action selected by the algorithm was in the
current state. This reward is passed to the training algorithm to gradually improve the policies of the RL
agents.
Args:
action (OrderedDict): actions of RL agents (here as a dictionary of agent IDs and their respective actions)
Returns:
Tuple: tuple containing:
- observations of all RL agents (dict), here as a dictionary of agent IDs and their respective \
observations
- rewards of all RL agents (dict)
- whether the episode has terminated (bool). We assume that all agents terminate an episode at the \
same time, as we have a centralized time management. Always false for continuous control
- same as above but the gymnasium API makes a difference between terminated and truncated, which can \
be useful for other environments but is not needed in our case
- additional information (dict)
"""
if action is not None:
# expects a list of actions or a single action (numpy array) as an input
if len(self.controllers) == 1 and isinstance(action, list):
raise ControllerError(self.controllers[0], "One agent but multiple actions")
if len(self.controllers) > 1 and isinstance(action, float):
raise ControllerError(self.controllers[0], "Multiple agents but only one action")
# store action
if self.normalize_actions:
action = self._denormalize_action(action)
self.current_action = action
obs, costs, info = self.sys.step(rl_action_callback=self.rl_action_callback, history=self.system_history)
self.completed_steps += 1
terminated, truncated = self._is_done()
# extract only the info for the RL controllers
# the obs_handler of each controller will
# a) take care of removing unwanted forecasts and
# b) stack past observations if specified
obs = {
agent: self.controllers[agent].obs_handler.get_adjusted_obs(agent_obs)
for agent, agent_obs in obs.items()
if agent in self.controllers.keys()
}
# rewards are negative costs
rewards = {agent: -agent_cost for agent, agent_cost in costs.items() if agent in self.controllers.keys()}
# update history with reward-penalty
for agent_id, agent in self.controllers.items():
agent.update_history({"reward_without_penalty": rewards[agent_id] + agent.history["safety_penalty"][-1][1]})
# get train history at end of episode:
if terminated or truncated:
self.train_history = {agent_id: copy(agent.history) for agent_id, agent in self.controllers.items()}
for agent_id in self.controllers.keys():
self.episode_history[agent_id].append(
{
"mean_penalty": np.mean([t[1] for t in self.train_history[agent_id]["safety_penalty"]]),
"rew_without_penalty": np.sum(
[t[1] for t in self.train_history[agent_id]["reward_without_penalty"]]
),
"n_corrections": np.sum([t[1] for t in self.train_history[agent_id]["action_corrected"]]),
}
)
return obs, rewards, terminated, truncated, info
[docs]
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
) -> Tuple[dict, dict]:
"""
Reset the power system to the beginning of an episode (which spans 24 hours).
Args:
seed: The seed that is used to initialize the environment’s PRNG (np_random). If the environment does not
already have a PRNG and seed=None (the default option) is passed, a seed will be chosen from some \
source of entropy (e.g. timestamp or /dev/urandom). However, if the environment already has a PRNG and \
seed=None is passed, the PRNG will not be reset. If you pass an integer, the PRNG will be reset even \
if it already exists. Usually, you want to pass an integer right after the environment has been \
initialized and then never again. This should be taken care of by calling super().reset(seed=seed) in \
the first line of this function. (https://gymnasium.farama.org/api/env/#gymnasium.Env.reset)
options: not needed
Returns:
Tuple: tuple containing
- observations of all RL agents after reset (dict)
- additional information for observations (dict)
"""
# call reset() of gymnasium Env class to ensure that we only reset ONCE right after initialization
# and then never again
super().reset(seed=seed)
self.completed_steps = 0
reset_time = self.sys.sample_start_date(self.fixed_start)
# reset system history
if self.system_history:
self.system_history.reset()
if self.train:
self.sys.reset(reset_time)
else:
# environment is already reset once in deployment runner. We only have to reset it once more here
# in case we use pre-trained RL policies (load them from a directory)
if len(self.controllers) > 0:
# check if we are working with pre-trained policies
pretrained_controllers = [getattr(ctrl, 'load_path') is not None for ctrl in self.controllers.values()]
only_pretrained_controllers = all(pretrained_controllers)
no_pretrained_controllers = not any(pretrained_controllers)
if not (only_pretrained_controllers or no_pretrained_controllers):
raise ValueError("The controllers all have to be either pre-trained or not. Mixing not possible.")
# we have to reset the system again because when loading the policies with single-agent RL,
# the env is seeded...
if only_pretrained_controllers:
self.sys.reset(reset_time)
obs, obs_info = self.sys.observe()
# extract only the info for the RL controllers
# the obs_handler of each controller will
# a) take care of removing unwanted forecasts and
# b) stack past observations if specified
obs = {
agent: self.controllers[agent].obs_handler.get_adjusted_obs(agent_obs)
for agent, agent_obs in obs.items()
if agent in self.controllers.keys()
}
return obs, obs_info
[docs]
def rl_action_callback(self, ctrl_id: str):
"""
Passes current action selected by training algorithm to the compute_control_input() function of the
BaseController class.
Args:
ctrl_id(str): ID of the controller for which to retrieve the action
Returns:
dict: actions for all controlled entities assigned to this controller
"""
return self.current_action[ctrl_id]
[docs]
def _is_done(self) -> Tuple[bool, bool]:
"""
Determines whether the environment has to be reset. "Done" normally means that a goal has been reached,
which is never the case in power systems control. It can also mean that a safety violation occured
(which also should not happen in our case, but could be implemented in case we want to let a system fail.)
"Truncated" means that we have reach the end of a pre-defined time limit and therefore want to reset.
We currently assume that all agents terminate an episode at the same time, as we have a centralized time
management
Returns:
tuple(bool, bool): Done, truncated
"""
done = False
if self.continuous_control:
truncated = False
else:
truncated = self.completed_steps == self.episode_length
return done, truncated
[docs]
def _get_observation_space(self) -> gym.spaces.Dict:
"""
Retrieve observation space from list of RL controllers and their observation masks.
Returns:
gym.spaces.Dict: dictionary of agent IDs and their observation spaces
"""
# ToDo: What happens in case we don't have a box space?
obs_spaces = OrderedDict()
for ctrl_id, ctrl in self.controllers.items():
nodes = ctrl.get_nodes()
ctrl_obs_space = ctrl.obs_handler.get_observation_space(nodes)
obs_spaces[ctrl_id] = ctrl_obs_space
obs_spaces = gym.spaces.Dict(obs_spaces)
return obs_spaces
[docs]
def _get_action_space(self) -> gym.spaces.Dict:
"""
Retrieve action space from RL controllers
Returns:
gym.spaces.Dict: dictionary of agent IDs and their action spaces
"""
# ToDo: What happens in case we don't have a box space?
act_spaces = OrderedDict()
for ctrl_id, ctrl in self.controllers.items():
act_spaces[ctrl_id] = ctrl.get_input_space(normalize=False)
act_spaces = gym.spaces.Dict(act_spaces)
return act_spaces
[docs]
def _get_normalized_action_space(self) -> gym.spaces.Dict:
"""
Normalize all actions to [-1,1]
Returns:
gym.spaces.Dict: dictionary of agent IDs and their action spaces
"""
# ToDo: What happens in case we don't have a box space?
act_spaces = OrderedDict()
norm_act_spaces = OrderedDict()
for ctrl_id, ctrl in self.controllers.items():
act_spaces[ctrl_id] = ctrl.get_input_space(normalize=False)
norm_act_spaces[ctrl_id] = ctrl.get_input_space(normalize=True)
norm_act_spaces = gym.spaces.Dict(norm_act_spaces)
act_spaces = gym.spaces.Dict(act_spaces)
return norm_act_spaces, act_spaces
[docs]
def _denormalize_action(self, action: OrderedDict) -> OrderedDict:
"""
Denormalize action to original input space.
Args:
action (OrderedDict): normalized action
Returns:
"""
scaled_action = deepcopy(action)
for ctrl_id, ctrl_action in action.items():
for node_id, node_action in ctrl_action.items():
for el_id, el_action in node_action.items():
action_low = self.original_action_space[ctrl_id][node_id][el_id].low
action_high = self.original_action_space[ctrl_id][node_id][el_id].high
new_action = (action[ctrl_id][node_id][el_id] - (-1 * np.ones((len(action_high,))))) / 2 * np.ones(
(
len(
action_high,
)
)
) * (action_high - action_low) + action_low
scaled_action[ctrl_id][node_id][el_id] = new_action
return scaled_action