"""
Wrappers to adjust API in environments.py to different RL training algorithms.
"""
from collections import OrderedDict, deque
from functools import partial
from typing import List, Optional, Tuple, Union
import gymnasium as gym
import numpy as np
from commonpower.control.environments import ControlEnv
from commonpower.control.parsing import ParserFactory
from commonpower.utils.tuple_db import RLTuple, TupleDB
[docs]
def ctrl_dict_to_list(input_dict: dict) -> list:
"""
Transforms the orginal dict of the controller assignments to a list of lists.
Args:
input_dict (dict): dictionary {agent_id: value}
Returns:
list: list of entries within the dict
"""
output_list = [value for value in input_dict.values()]
return output_list
[docs]
def list_to_ctrl_dict(input_list: list, original_keys: dict) -> dict:
"""
Reverses the transform_to_ordered_dict_keys function.
Args:
input_list (list): list of control actions for each agent
original_keys (dict): nested dictionary of original action keys for each agent
as {agent_id: {node_id: list[element_ids]}}
Returns:
dict: original dictionary mapping {original_key: value}
"""
output_dict = {}
agent_count = 0
for agent_id, agent_action_keys in original_keys.items():
agent_input_count = 0
agent_output_dict = {}
for node_id, node_action_keys in agent_action_keys.items():
num_node_inputs = len(node_action_keys)
agent_output_dict[node_id] = {
node_action_keys[i]: np.array([input_list[agent_count][0, i + agent_input_count]])
for i in range(num_node_inputs)
}
agent_input_count = agent_input_count + num_node_inputs
agent_count = agent_count + 1
output_dict[agent_id] = agent_output_dict
# output_dict = {original_keys[i]: value for i, value in enumerate(input_list)}
return output_dict
[docs]
def recursive_items(dictionary):
"""
Recursive extraction of all values in a nested dictionary or gym.spaces.Dict
"""
for key, value in dictionary.items():
if isinstance(value, (gym.spaces.Dict, dict)):
yield from recursive_items(value)
else:
yield (key, value)
[docs]
class WrapperStack:
def __init__(self):
self.wrappers = []
def add(self, wrapper: gym.Wrapper, **kwargs):
self.wrappers.append((wrapper, kwargs))
return self
def get_stack(self):
def wrap_func(env: gym.Env, wrappers: list):
for wrapper in wrappers:
env = wrapper[0](env, **wrapper[1])
return env
return partial(wrap_func, wrappers=self.wrappers)
[docs]
class DeploymentWrapper(gym.Wrapper):
def __init__(self, env):
"""
Wrapper to standardize the deployment within CommonPower. Mainly takes care of transforming actions and
observations such that they match the underlying wrappers (e.g., SingleAgentWrapper, MultiAgentWrapper, ...).
Args:
env (gym.Environment): potentially wrapped ControlEnv
Returns: DeploymentWrapper
"""
super().__init__(env)
self.env = env
self.parser = ParserFactory(env).get_parser()
[docs]
def step(self, action: Union[OrderedDict, None]) -> Tuple[dict, dict, bool, bool, dict]:
# convert the action to the format required by the underlying environment/ wrapper stack
if action is not None:
action = self.parser.parse_action(action)
obs, reward, done, truncated, info = self.env.step(action)
obs = self.parser.parse_obs(obs)
return obs, reward, done, truncated, info
[docs]
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
) -> Tuple[dict, dict]:
obs, obs_info = self.env.reset()
obs = self.parser.parse_obs(obs)
return obs, obs_info
[docs]
class SingleAgentWrapper(gym.Wrapper):
def __init__(self, env):
"""
Wrapper to standardize ControlEnv to the API for single-agent RL training with any RL algorithm from the
StableBaselines 3 repository.
Args:
env (ControlEnv): power system environment with multi-agent API
Returns:
SingleAgentWrapper
"""
super().__init__(env)
self.env = env
if len(self.env.get_wrapper_attr("controllers")) > 1:
raise ValueError("SingleAgentWrapper cannot handle more than 1 agent")
self.ctrl_id = list(self.env.get_wrapper_attr("controllers").keys())[0]
# training history
self.train_history = {}
self.episode_history = deque(maxlen=100)
# transform observation and action space from dictionary to box
ctrl_obs_space = self.env.observation_space[self.ctrl_id]
obs_low = np.array([])
obs_high = np.array([])
for el_id, el_obs in recursive_items(ctrl_obs_space):
obs_low = np.concatenate((obs_low, el_obs.low))
obs_high = np.concatenate((obs_high, el_obs.high))
self.observation_space = gym.spaces.Box(low=obs_low, high=obs_high, dtype=np.float64)
ctrl_act_space = self.env.action_space[self.ctrl_id]
act_low = np.array([])
act_high = np.array([])
for n_id, n_act_space in ctrl_act_space.items():
for el in n_act_space.values():
act_low = np.concatenate((act_low, el.low))
act_high = np.concatenate((act_high, el.high))
self.action_space = gym.spaces.Box(low=act_low, high=act_high, dtype=np.float64)
[docs]
def reset(self, *, seed=None, options=None):
"""
Reset the environment
Args:
seed: seed for the random number generator
options: not needed here
Returns:
None
"""
obs, obs_info = self.env.reset(seed=seed, options=options)
# unpack observation
obs = self._unpack_obs(obs)
return obs, obs_info
[docs]
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, dict]:
"""
Step function with the single-agent API (takes numpy array action and outputs numpy array observation)
Args:
action (np.ndarray): action selected by the RL policy
Returns:
Tuple: tuple containing:
- single-agent observation (np.ndarray)
- single-agent reward (float)
- whether the environment is terminated (bool)
- whether environment is truncated. In our case, the same as terminated (bool)
- additional information (dict)
"""
dummy_action = self.env.action_space.sample()
act_count = 0
# fill action dictionary with values
for n_id, n_act in dummy_action[self.ctrl_id].items():
for el_id, el_act in n_act.items():
num_act = el_act.shape[0]
dummy_action[self.ctrl_id][n_id][el_id] = action[act_count : act_count + num_act]
act_count = act_count + num_act
obs, reward, terminated, truncated, info = self.env.step(dummy_action)
reward = reward[self.ctrl_id]
obs = self._unpack_obs(obs)
if terminated or truncated:
self.train_history = self.env.get_wrapper_attr("train_history")[self.ctrl_id]
self.episode_history = self.env.get_wrapper_attr("episode_history")[self.ctrl_id]
return obs, reward, terminated, truncated, info
[docs]
def _unpack_obs(self, obs: dict) -> np.ndarray:
"""
Convert dictionary of {agent_id: observation_dict} to flattened observation array.
Args:
obs (dict): observation dictionary {agent_id: observation_dict}
Returns:
np.ndarray: flat array of observations
"""
ctrl_obs = obs[self.ctrl_id]
new_obs = np.array([])
for el_id, el_obs in recursive_items(ctrl_obs):
new_obs = np.concatenate((new_obs, el_obs))
return new_obs
[docs]
class RecordTransitionsWrapper(gym.Wrapper):
def __init__(
self,
env: ControlEnv,
scenario_id: str,
run_config: dict,
seed: int,
tuple_db: TupleDB,
buffer_size: int = 100,
write_buffer_on_done: bool = True,
):
"""
Wrapper for recording transition tuples (s,a,s',r) either to current disk or to a data base.
NOTE: Currently only available for single-agent RL!
Args:
env (gym.Env): The gym environment to be wrapped.
tuple_db (TupleDB): The database for storing the transition tuples.
buffer_size (int, optional): The maximum size of the tuple buffer. Defaults to 100.
write_buffer_on_done (bool, optional): Whether to always write out the buffer on a done state.
Defaults to True.
"""
super().__init__(env)
if len(env.get_wrapper_attr("controllers")) > 1:
raise ValueError("RecordTransitionsWrapper cannot handle more than 1 agent")
self.tuple_db = tuple_db
self.buffer_size = buffer_size
self.write_buffer_on_done = write_buffer_on_done
self.tuple_buffer: List[RLTuple] = []
# next obs structure ensures that (s, a, r, s') are collected in the correct order
# necessary for d3rlpy library
self.current_obs = None
self.tuple_db.create_run(scenario_id, run_config, seed)
[docs]
def step(self, action):
next_obs, reward, terminated, truncated, info = self.env.step(action)
# TODO: support raw information in the future as well
assert isinstance(self.current_obs, np.ndarray) and isinstance(
action, np.ndarray
), "observation and action can only be numpy arrays for now"
current_tuple = RLTuple(
observation=self.current_obs,
action=action,
reward=reward,
terminal=terminated,
timeout=truncated,
)
self.tuple_buffer.append(current_tuple)
if len(self.tuple_buffer) >= self.buffer_size or ((terminated or truncated) and self.write_buffer_on_done):
self.tuple_db.record_tuples(self.tuple_buffer)
self.tuple_buffer = []
self.current_obs = next_obs
return next_obs, reward, terminated, truncated, info
[docs]
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
self.current_obs = obs
return obs, info
[docs]
class MultiAgentWrapper(gym.Wrapper):
def __init__(self, env):
"""
Wrapper to standardize ControlEnv to the API for MAPPO/IPPO implementation of the on-policy repository
(https://github.com/marlbenchmark/on-policy/tree/main/onpolicy). NOTE: We use our own fork of this repository,
see the Readme file.
Args:
env (ControlEnv): power system environment with multi-agent API
Returns:
MultiAgentWrapper
"""
super().__init__(env)
self.env = env
self.n_agents = len(self.get_wrapper_attr("controllers"))
# training history
self.train_history = {}
self.episode_history = {}
# the MAPPO/IPPO implementation expects the action/observation space as a list of lists
self.action_space, self.original_action_keys = self.act_space_dict_to_list(self.action_space)
self.observation_space = self.obs_space_dict_to_list(self.observation_space)
# The shared observation space is a list with as many entries as we have agents. Each entry contains a numpy
# array with the stacked observation space of all agents for now
# (even if there are redundant observations)
# TODO: remove redundant observations
total_n_obs = sum([len(agent_obs_space.low) for agent_obs_space in self.observation_space])
share_low = np.empty(shape=(total_n_obs,))
share_high = np.empty(shape=(total_n_obs,))
n_obs = 0
for agent_obs in self.observation_space:
n_agent_obs = len(agent_obs.low)
share_low[n_obs : n_obs + n_agent_obs] = agent_obs.low
share_high[n_obs : n_obs + n_agent_obs] = agent_obs.high
n_obs = n_obs + n_agent_obs
self.unwrapped.share_observation_space = [
gym.spaces.Box(low=share_low, high=share_high) for _ in range(self.n_agents)
]
[docs]
def reset(self, *, seed=None, options=None):
"""
Reset the environment
Args:
seed: seed for the random number generator
options: not needed here
Returns:
None
"""
obs, obs_info = self.env.reset(seed=seed, options=options)
obs = self._unpack_obs(obs)
obs = ctrl_dict_to_list(obs)
return obs, obs_info
[docs]
def step(self, action: List[np.ndarray]) -> Tuple[List[np.ndarray], List[float], bool, bool, dict]:
"""
Advance the environment (in our case, the power system) by one step in time by applying control actions to
discrete-time dynamics and updating data sources. Handled within the System class. The actions of the RL agent
are selected within the RL training algorithm and are passed on to the power system using a callback. After the
system update, a reward is computed which indicates how good the action selected by the algorithm was in the
current state. This reward is passed to the training algorithm to gradually improve the policies of the RL
agents.
Args:
action (List[np.ndarray]): actions of RL agents (here as a list of numpy arrays)
Returns:
Tuple: tuple containing:
- observations of all RL agents, here as a list of observations of each agent as numpy arrays (list).
- rewards of all RL agents (list).
- whether the episode has terminated (bool). We assume that all agents terminate an episode at the \
same time, as we have a centralized time management. Always false for continuous control
- same as above (bool), but the gymnasium API makes a difference between terminated and truncated, \
which can be useful for other environments but is not needed in our case
- additional information (dict)
"""
# transform the actions from a list of numpy arrays to a nested dictionary
# {agent_id: {node_id: {element_id: action, ...}, ...}, ...} with the original keys from the ControlEnv
action_dict = list_to_ctrl_dict(action, self.original_action_keys)
for ctrl in action_dict:
dummy_action = self.env.get_wrapper_attr("controllers")[ctrl].input_space.sample()
act_count = 0
# fill action dictionary with values
for n_id, n_act in dummy_action.items():
for el_id, el_act in n_act.items():
num_act = el_act.shape[0]
dummy_action[n_id][el_id] = action[act_count : act_count + num_act]
act_count = act_count + num_act
# step original ControlEnv with the transformed action_dict
obs, rewards, terminated, truncated, info = self.env.step(action_dict)
# convert observation dictionary to list of observations
obs = self._unpack_obs(obs)
obs = ctrl_dict_to_list(obs)
if terminated or truncated:
self.train_history = ctrl_dict_to_list(self.env.get_wrapper_attr("train_history"))
self.episode_history = ctrl_dict_to_list(self.env.get_wrapper_attr("episode_history"))
rewards = ctrl_dict_to_list(rewards)
return obs, rewards, terminated, truncated, info
[docs]
def _unpack_obs(self, obs: dict) -> np.ndarray:
"""
Convert dictionary of {agent_id: observation_dict} to a dictonary of
{agent_id: flattened observation arrays}.
Args:
obs (dict): observation dictionary {agent_id: observation_dict}
Returns:
np.ndarray: flat array of observations
"""
# Get list of all controller ids
ctrl_ids = list(self.env.get_wrapper_attr("controllers").keys())
# Initialize an empty dictionary for new observations
new_obs_dict = {}
# Iterate over each controller id
for ctrl_id in ctrl_ids:
# Get observations for this controller
ctrl_obs = obs[ctrl_id]
# Initialize an empty array for this controller's new observations
new_obs = np.array([])
# Unpack the observation dictionary for this controller
for el_id, el_obs in recursive_items(ctrl_obs):
new_obs = np.concatenate((new_obs, el_obs))
# Add this controller's new observations to the dictionary
new_obs_dict[ctrl_id] = new_obs
# print(f"new_obs_dict: {new_obs_dict}")
return new_obs_dict
[docs]
def act_space_dict_to_list(self, action_space: dict) -> Tuple[List[gym.spaces.Box], dict]:
"""
Transforms an action space in the form of a nested dictionary into a list of Box spaces for each agent.
Returns the original keys to allow re-transformation
Args:
action_space (dict): nested dictionary of {agent_id: {node_id: {element_id: el_action_space}}}
Returns:
Tuple: tuple containing:
- list of flattened agent action spaces (List[gym.spaces.Box])
- dictionary with original actions keys from the action space received as an input (dict)
"""
# dictionary of {node_ids: {action_keys}}
action_keys = {}
env_action_space = []
for agent_id, agent_action_space in action_space.items():
agent_action_keys = {}
# lower and upper limits for Box spaces
agent_lower = np.array([])
agent_higher = np.array([])
for node_id, node_action_space in agent_action_space.items():
agent_action_keys[node_id] = list(node_action_space.keys())
for element_action_space in node_action_space.values():
agent_lower = np.concatenate((agent_lower, element_action_space.low))
agent_higher = np.concatenate((agent_higher, element_action_space.high))
action_keys[agent_id] = agent_action_keys
flat_agent_action_space = gym.spaces.Box(low=agent_lower, high=agent_higher)
self.get_wrapper_attr("controllers")[agent_id].flattened_input_space = flat_agent_action_space
env_action_space.append(flat_agent_action_space)
return env_action_space, action_keys
[docs]
def obs_space_dict_to_list(self, observation_space: dict) -> List[gym.spaces.Box]:
"""
Transforms the observation space in the form of a nested dictionary into a list of Box spaces for each agent
Args:
observation_space (dict): nested dictionary of {agent_id: {node_id: {element_id: el_obs_space}}}
Returns:
List[gym.spaces.Box]: list of flattened agent observation spaces
"""
env_obs_space = []
for agent_id, agent_obs_space in observation_space.items():
lower = np.array([])
higher = np.array([])
for element_id, element_obs_space in recursive_items(agent_obs_space):
# print(element_obs_space)
lower = np.concatenate((lower, element_obs_space.low))
higher = np.concatenate((higher, element_obs_space.high))
flat_agent_obs_space = gym.spaces.Box(low=lower, high=higher)
self.env.get_wrapper_attr("controllers")[agent_id].flattened_obs_space = flat_agent_obs_space
env_obs_space.append(flat_agent_obs_space)
return env_obs_space