"""
Collection of safety layers.
"""
import logging
from copy import deepcopy
from typing import Dict, List, Tuple
from pyomo.core import ConcreteModel, Objective, quicksum
from pyomo.core.expr.numeric_expr import SumExpression
from pyomo.environ import value
from pyomo.opt import TerminationCondition
from pyomo.opt.solver import OptSolver
from commonpower.control.safety_layer.penalties import BasePenalty, DistanceDependingPenalty
from commonpower.control.util import clone_from_top_level_nodes
from commonpower.modeling.base import ModelEntity
from commonpower.utils.cp_exceptions import EntityError
from commonpower.utils.default_solver import get_default_solver
logging.getLogger().setLevel(logging.ERROR)
[docs]
class BaseSafetyLayer:
def __init__(self):
"""
Base class for safety layers. A safety layer checks whether the action selected by a controller violates any
constraints of the controlled entities and adjusts the actions if necessary.
Returns:
BaseSafetyLayer
"""
self.nodes = None
self.top_level_nodes = None
self.obj_fcn = None
self.unsafe_action = None
[docs]
def initialize(self, nodes: List[ModelEntity], top_level_nodes: List[ModelEntity]):
"""
Initializes the safety layer
Args:
nodes (List[ModelEntity]):
list of controlled entities to be safeguarded
top_level_nodes (List[ModelEntity]):
list of controlled entities in highest level of model tree
solver (OptSolver):
solver for optimization problem which will be called by Pyomo
Returns:
None
"""
self.nodes = nodes
self.top_level_nodes = top_level_nodes
[docs]
def compute_safe_action(self, action: Dict = None) -> Tuple[Dict, bool, float]:
"""
Checks whether the actions proposed by the controller satisfy the constraints
of the controlled entities and modifies them if necessary.
Args:
action (dict): action suggested by the controller
Returns:
safe_action (dict): verified action
action_corrected (bool): whether the action was corrected or not
correction_penalty (float): penalty for action correction (0 if action was not corrected)
"""
raise NotImplementedError("Safety layers need to implement this method, do not use BaseClass directly.")
[docs]
class ActionReplacementWithOptSafetyLayer(BaseSafetyLayer):
# distance used to determine if the action was corrected or not
DISTANCE_EPS = 1e-5
def __init__(self, penalty: BasePenalty, solver: OptSolver = None):
"""
Action replacement safety layer. Action violating the constraints is replaced
by safe action determined through an optimization method.
Args:
penalty (BasePenalty): class defining the penalty behavior for unsafe actions
solver (OptSolver, optional):
solver for optimization problem, defaults to direct gurobi
https://pyomo.readthedocs.io/en/stable/library_reference/solvers/gurobi_direct.html
Returns:
ActionReplacementWithOptSafetyLayer
"""
super().__init__()
if solver is None:
solver = get_default_solver()
self.penalty = penalty
self.solver = solver
def __del__(self):
"""Called when the class is destroyed, takes care to release Gurobi resources"""
self.solver.close()
[docs]
def compute_safe_action(self, action: Dict = None) -> Tuple[Dict, bool, float]:
"""
Checks whether the actions proposed by the controller satisfy the constraints of the controlled entities and
replaces by pyomo-generated output if necessary.
Args:
action (dict): action suggested by the controller
Returns:
safe_action (dict): verified action
action_corrected (bool): whether the action was corrected or not
correction_penalty (float): penalty for action correction (0 if action was not corrected)
"""
# store action
self.unsafe_action = action
action_feasible = self.is_action_feasible(action)
if action_feasible:
return action, False, 0.0
model = self.prepare_model()
model = self.set_action_in_model(model, action) # initializes optimization with current action
action_distance = self.solve_model(model)
if action_distance is None:
raise EntityError(self.top_level_nodes[0], "Cannot find a safe input")
# second check is needed for the action projection safety layer as it
# avoids having to do the costly feasibility check
action_corrected = action_distance > self.DISTANCE_EPS
safe_action = self.set_action_from_model(model, action)
if not action_corrected:
return safe_action, action_corrected, 0.0
correction_penalty = self.get_penalty(action_distance)
return safe_action, action_corrected, correction_penalty
[docs]
def get_penalty(self, action_distance: float) -> float:
"""
Get penalty depending on the penalty class used.
Args:
action_distance (float): distance between the safe and unsafe action,
only used if penalty is distance based
Returns:
float: computed penalty for the action
"""
# correction penalty (can be used in RL reward function) -> depends on used penalty type
# If the penalty is distance depending, the penalty is computed based on the distance between the unsafe action
# and the safe action. Therefore, the penalty needs the value of the safety objective function.
if isinstance(self.penalty, DistanceDependingPenalty):
return self.penalty.get_correction_penalty(action_distance)
else:
return self.penalty.get_correction_penalty()
[docs]
def is_action_feasible(self, action: dict) -> dict:
"""Check if action is feasible.
Args:
action (dict): action to check
Returns:
bool: True if action is feasible
"""
model = self.prepare_model()
self.set_action_in_model(model, action, fix_values=True)
res = self.solve_model(model)
return res is not None
[docs]
def prepare_model(self) -> ConcreteModel:
"""
Clone model from sys to run local optimization over
Returns:
ConcreteModel: pyomo optimization model, representing
the part of the system under supervision of the safety
"""
# get current system pyomo instance
sys_inst = self.nodes[0].instance
mdl = clone_from_top_level_nodes(self.top_level_nodes, sys_inst)
return mdl
[docs]
def solve_model(self, model: ConcreteModel) -> float:
"""
Finds a feasible action by solving a local model.
Returns the distance of the original unsafe action to the solved model.
Args:
model (ConcreteModel): model to solve
Returns:
float: distance from action or None
"""
results = self.solver.solve(model, warmstart=True)
if results.solver.termination_condition in [
TerminationCondition.infeasible,
TerminationCondition.unbounded,
TerminationCondition.infeasibleOrUnbounded,
]:
with open("infeasible_safety_model.log", "w") as f:
model.pprint(f)
return None
distance_from_action = value(self.action_distance(model, self.unsafe_action))
return distance_from_action
[docs]
def action_distance(self, model: ConcreteModel, action: dict) -> SumExpression:
"""
Euclidean norm between action expressed by the model and the action in dict
Args:
model (ConcreteModel): pyomo optimization model
action (dict): action description
Returns:
SumExpression: distance between the action and the model values expressed
in pyomo class, convert to float by running value() over it
"""
obj_fcn_elements = []
for node in self.nodes:
node_input_ids = node.get_input_ids(model)
if node_input_ids is not None:
# separate the input element name and the node id
el_names = [n_id.split(".")[-1] for n_id in node_input_ids]
global_node_ids = [".".join(n_id.split(".")[:-1]) for n_id in node_input_ids]
# obtain action horizon (for how many time steps does the RL agent predict the action)
action_horizon = list(range(len(action[global_node_ids[0]][el_names[0]])))
# action projection objective function: (a_RL[t] - a_safe[t])^2 for all t in action_horizon
# first step: (a_RL[t] - a_safe[t]) for all t and for all input elements of the current node
node_fcn = [
action[global_node_ids[i]][el_names[i]][t] - node.get_pyomo_element(el_names[i], model)[t]
for t in action_horizon
for i in range(len(el_names))
]
# second step: ()^2
node_fcn = [item**2 for item in node_fcn]
obj_fcn_elements.append(node_fcn)
# flatten list
obj_fcn_elements = [item for sublist in obj_fcn_elements for item in sublist]
# sum over all time steps and all input elements of all nodes
obj = quicksum(obj_fcn_elements)
return obj
[docs]
def set_action_from_model(self, model: ConcreteModel, action: dict) -> dict:
"""Corrects an action according to a model.
Args:
model (ConcreteModel): pyomo model to ge the values from
action (dict): action to correct
Returns:
dict: safe action according to the model
"""
safe_action = deepcopy(action)
node_actions = {}
for node in self.nodes:
node_action = node.get_inputs(model)
if node_action is not None:
node_actions[node.id] = node_action
for node_id, actions in safe_action.items():
for el_id, el_action in actions.items():
for i in range(el_action.shape[0]):
safe_action[node_id][el_id][i] = node_actions[node_id][el_id][i]
return safe_action
[docs]
def set_action_in_model(self, model: ConcreteModel, action: dict, fix_values: bool = False) -> ConcreteModel:
"""
Sets the model values to the action to initialize the optimization.
If fix values is True, the model values will be fixed, which is useful
to check the feasibility of the given action.
Args:
model (ConcreteModel): pyomo optimization model
action (dict): action to set into the model
fix_values (bool): optionally fixes the values for the optimization problem
Returns:
ConcreteModel: model with values set
"""
for node in self.nodes:
node_input_ids = node.get_input_ids(model)
if node_input_ids is not None:
# separate the input element name and the node id
el_names = [n_id.split(".")[-1] for n_id in node_input_ids]
global_node_ids = [".".join(n_id.split(".")[:-1]) for n_id in node_input_ids]
for i in range(len(el_names)):
node.set_value(model, el_names[i], action[global_node_ids[i]][el_names[i]], fix_value=fix_values)
return model
[docs]
class ActionProjectionSafetyLayer(ActionReplacementWithOptSafetyLayer):
def __init__(self, penalty: BasePenalty, solver: OptSolver = None):
"""
Computes safe action by minimizing the
distance between the RL action and the safe
action while also satisfying
constraints.
Args:
penalty (BasePenalty): class defining the penalty behavior for unsafe actions
solver (OptSolver, optional):
solver for optimization problem, defaults to direct gurobi
https://pyomo.readthedocs.io/en/stable/library_reference/solvers/gurobi_direct.html
Returns:
ActionReplacementSafetyLayer
"""
super().__init__(penalty, solver)
[docs]
def prepare_model(self) -> ConcreteModel:
"""Prepare model. Additionally projection criterion is added
Returns:
ConcreteModel: pyomo optimization model, representing
the part of the system under supervision of the safety
"""
model = super().prepare_model()
model.safety_obj = Objective(expr=lambda model: self.action_distance(model, self.unsafe_action))
return model
[docs]
def is_action_feasible(self, action) -> bool:
"""
As feasibility check is quite costly, we can skip for projection layer
as we check action distance to the original action after optimization
"""
return False