Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grid2op/Backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __init__(self,
self._missing_detachment_support_info : bool = True
self.detachment_is_allowed : bool = DEFAULT_ALLOW_DETACHMENT


def can_handle_more_than_2_busbar(self):
"""
.. versionadded:: 1.10.0
Expand Down
136 changes: 136 additions & 0 deletions grid2op/Backend/protectionScheme.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import copy
import logging
from typing import Tuple, Union, Any, List, Optional
import numpy as np

from grid2op.dtypes import dt_int
from grid2op.Backend.backend import Backend
from grid2op.Parameters import Parameters
from grid2op.Exceptions import Grid2OpException
from grid2op.Backend.thermalLimits import ThermalLimits

class DefaultProtection:
"""
Advanced class to manage network protections and disconnections.
"""

def __init__(
self,
backend: Backend,
parameters: Optional[Parameters] = None,
thermal_limits: Optional[ThermalLimits] = None,
is_dc: bool = False,
logger: Optional[logging.Logger] = None,
):
"""
Initializes the network state with customizable protections.
"""
self.backend = backend
self._parameters = copy.deepcopy(parameters) if parameters else Parameters()
self._validate_input(self.backend, self._parameters)

self.is_dc = is_dc
self.thermal_limits = thermal_limits

self._thermal_limit_a = self.thermal_limits.limits if self.thermal_limits else None
self.backend.thermal_limit_a = self._thermal_limit_a

self._hard_overflow_threshold = self._get_value_from_parameters("HARD_OVERFLOW_THRESHOLD")
self._soft_overflow_threshold = self._get_value_from_parameters("SOFT_OVERFLOW_THRESHOLD")
self._nb_timestep_overflow_allowed = self._get_value_from_parameters("NB_TIMESTEP_OVERFLOW_ALLOWED")
self._no_overflow_disconnection = self._get_value_from_parameters("NO_OVERFLOW_DISCONNECTION")

self.disconnected_during_cf = np.full(self.thermal_limits.n_line, fill_value=-1, dtype=dt_int)
self._timestep_overflow = np.zeros(self.thermal_limits.n_line, dtype=dt_int)
self.conv_ = self._run_power_flow()
self.infos: List[str] = []

if logger is None:
self.logger = logging.getLogger(__name__)
self.logger.disabled = True
else:
self.logger: logging.Logger = logger.getChild("grid2op_BaseEnv")

def _validate_input(self, backend: Backend, parameters: Optional[Parameters]) -> None:
if not isinstance(backend, Backend):
raise Grid2OpException(f"Argument 'backend' must be of type 'Backend', received: {type(backend)}")
if parameters and not isinstance(parameters, Parameters):
raise Grid2OpException(f"Argument 'parameters' must be of type 'Parameters', received: {type(parameters)}")

def _get_value_from_parameters(self, parameter_name: str) -> Any:
return getattr(self._parameters, parameter_name, None)

def _run_power_flow(self) -> Optional[Exception]:
try:
return self.backend._runpf_with_diverging_exception(self.is_dc)
except Exception as e:
if self.logger is not None:
self.logger.error(
f"Power flow error: {e}"
)
return e

def _update_overflows(self, lines_flows: np.ndarray) -> np.ndarray:
if self._thermal_limit_a is None:
if self.logger is not None:
self.logger.error(
"Thermal limits must be provided for overflow calculations."
)
raise ValueError("Thermal limits must be provided for overflow calculations.")

lines_status = self.backend.get_line_status() # self._thermal_limit_a remains fixed. self._soft_overflow_threshold = 1
is_overflowing = (lines_flows >= self._thermal_limit_a * self._soft_overflow_threshold) & lines_status
self._timestep_overflow[is_overflowing] += 1
# self._hard_overflow_threshold = 1.5
exceeds_hard_limit = (lines_flows > self._thermal_limit_a * self._hard_overflow_threshold) & lines_status
exceeds_allowed_time = self._timestep_overflow > self._nb_timestep_overflow_allowed

lines_to_disconnect = exceeds_hard_limit | (exceeds_allowed_time & lines_status)
return lines_to_disconnect

def _disconnect_lines(self, lines_to_disconnect: np.ndarray, timestep: int) -> None:
for line_idx in np.where(lines_to_disconnect)[0]:
self.backend._disconnect_line(line_idx)
self.disconnected_during_cf[line_idx] = timestep
if self.logger is not None:
self.logger.warning(f"Line {line_idx} disconnected at timestep {timestep}.")

def next_grid_state(self) -> Tuple[np.ndarray, List[Any], Union[None, Exception]]:
try:
timestep = 0
while True:
power_flow_result = self._run_power_flow()
if power_flow_result:
return self.disconnected_during_cf, self.infos, power_flow_result

lines_flows = self.backend.get_line_flow()
lines_to_disconnect = self._update_overflows(lines_flows)

if not lines_to_disconnect.any():
break

self._disconnect_lines(lines_to_disconnect, timestep)
timestep += 1

return self.disconnected_during_cf, self.infos, None

except Exception as e:
if self.logger is not None:
self.logger.exception("Unexpected error in calculating the network state.")
return self.disconnected_during_cf, self.infos, e

class NoProtection(DefaultProtection):
"""
Class that disables overflow protections while keeping the structure of DefaultProtection.
"""
def __init__(self, backend: Backend, thermal_limits: ThermalLimits, is_dc: bool = False):
super().__init__(backend, parameters=None, thermal_limits=thermal_limits, is_dc=is_dc)

def next_grid_state(self) -> Tuple[np.ndarray, List[Any], None]:
"""
Ignores protections and returns the network state without disconnections.
"""
return self.disconnected_during_cf, self.infos, self.conv_

class BlablaProtection:
pass
220 changes: 220 additions & 0 deletions grid2op/Backend/thermalLimits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
from typing import Union, List, Optional, Dict
try:
from typing import Self
except ImportError:
from typing_extensions import Self

import logging
import numpy as np
import copy

import grid2op
from grid2op.dtypes import dt_float
from grid2op.Exceptions import Grid2OpException

class ThermalLimits:
"""
Class for managing the thermal limits of power grid lines.
"""

def __init__(
self,
_thermal_limit_a: Optional[np.ndarray] = None,
line_names: Optional[List[str]] = None,
n_line: Optional[int] = None,
logger: Optional[logging.Logger] = None,
):
"""
Initializes the thermal limits manager.

:param thermal_limits: Optional[np.ndarray]
Array of thermal limits for each power line. Must have the same length as the number of lines.
:param line_names: Optional[List[str]]
List of power line names.
:param n_line: Optional[int]
Number of lines (can be passed explicitly or inferred from `thermal_limits` or `line_names`).

:raises ValueError:
If neither `thermal_limits` nor `n_line` and `line_names` are provided.
"""
if _thermal_limit_a is None and (n_line is None and line_names is None):
raise ValueError("Must provide thermal_limits or both n_line and line_names.")

self._thermal_limit_a = _thermal_limit_a
self._n_line = n_line
self._name_line = line_names

if logger is None:
self.logger = logging.getLogger(__name__)
self.logger.disabled = True
else:
self.logger: logging.Logger = logger.getChild("grid2op_BaseEnv")

@property
def n_line(self) -> int:
return self._n_line

@n_line.setter
def n_line(self, new_n_line: int) -> None:
if new_n_line <= 0:
raise ValueError("Number of lines must be a positive integer.")
self._n_line = new_n_line
if self.logger is not None:
self.logger.info(f"Number of lines updated to {self._n_line}.")

@property
def name_line(self) -> Union[List[str], np.ndarray]:
return self._name_line

@name_line.setter
def name_line(self, new_name_line: Union[List[str], np.ndarray]) -> None:
if isinstance(new_name_line, np.ndarray):
if not np.all([isinstance(name, str) for name in new_name_line]):
raise ValueError("All elements in name_line must be strings.")
elif isinstance(new_name_line, list):
if not all(isinstance(name, str) for name in new_name_line):
raise ValueError("All elements in name_line must be strings.")
else:
raise ValueError("Line names must be a list or numpy array of non-empty strings.")

if self._n_line is not None and len(new_name_line) != self._n_line:
raise ValueError("Length of name list must match the number of lines.")

self._name_line = new_name_line
if self.logger is not None:
self.logger.info("Power line names updated")

@property
def limits(self) -> np.ndarray:
"""
Gets the current thermal limits of the power lines.

:return: np.ndarray
The array containing thermal limits for each power line.
"""
return self._thermal_limit_a

@limits.setter
def limits(self, new_limits: Union[np.ndarray, Dict[str, float]]):
"""
Sets new thermal limits.

:param new_limits: Union[np.ndarray, Dict[str, float]]
Either a numpy array or a dictionary mapping line names to new thermal limits.

:raises ValueError:
If the new limits array size does not match the number of lines.
:raises Grid2OpException:
If invalid power line names are provided in the dictionary.
If the new thermal limit values are invalid (non-positive or non-convertible).
:raises TypeError:
If the input type is not an array or dictionary.
"""
if isinstance(new_limits, np.ndarray):
if new_limits.shape[0] == self.n_line:
self._thermal_limit_a = 1.0 * new_limits.astype(dt_float)
elif isinstance(new_limits, dict):
for el in new_limits.keys():
if not el in self.name_line:
raise Grid2OpException(
'You asked to modify the thermal limit of powerline named "{}" that is not '
"on the grid. Names of powerlines are {}".format(
el, self.name_line
)
)
for i, el in enumerate(self.name_line):
if el in new_limits:
try:
tmp = dt_float(new_limits[el])
except Exception as exc_:
raise Grid2OpException(
'Impossible to convert data ({}) for powerline named "{}" into float '
"values".format(new_limits[el], el)
) from exc_
if tmp <= 0:
raise Grid2OpException(
'New thermal limit for powerlines "{}" is not positive ({})'
"".format(el, tmp)
)
self._thermal_limit_a[i] = tmp

def env_limits(self, thermal_limit):
"""
"""
if isinstance(thermal_limit, dict):
tmp = np.full(self.n_line, fill_value=np.NaN, dtype=dt_float)
for key, val in thermal_limit.items():
if key not in self.name_line:
raise Grid2OpException(
f"When setting a thermal limit with a dictionary, the keys should be line "
f"names. We found: {key} which is not a line name. The names of the "
f"powerlines are {self.name_line}"
)
ind_line = (self.name_line == key).nonzero()[0][0]
if np.isfinite(tmp[ind_line]):
raise Grid2OpException(
f"Humm, there is a really strange bug, some lines are set twice."
)
try:
val_fl = float(val)
except Exception as exc_:
raise Grid2OpException(
f"When setting thermal limit with a dictionary, the keys should be "
f"the values of the thermal limit (in amps) you provided something that "
f'cannot be converted to a float. Error was "{exc_}".'
)
tmp[ind_line] = val_fl

elif isinstance(thermal_limit, (np.ndarray, list)):
try:
tmp = np.array(thermal_limit).flatten().astype(dt_float)
except Exception as exc_:
raise Grid2OpException(
f"Impossible to convert the vector as input into a 1d numpy float array. "
f"Error was: \n {exc_}"
)
if tmp.shape[0] != self.n_line:
raise Grid2OpException(
"Attempt to set thermal limit on {} powerlines while there are {}"
"on the grid".format(tmp.shape[0], self.n_line)
)
if (~np.isfinite(tmp)).any():
raise Grid2OpException(
"Impossible to use non finite value for thermal limits."
)
else:
raise Grid2OpException(
f"You can only set the thermal limits of the environment with a dictionary (in that "
f"case the keys are the line names, and the values the thermal limits) or with "
f"a numpy array that has as many components of the number of powerlines on "
f'the grid. You provided something with type "{type(thermal_limit)}" which '
f"is not supported."
)

self._thermal_limit_a = tmp
if self.logger is not None:
self.logger.info("Env thermal limits successfully set.")

def update_limits_from_vector(self, thermal_limit_a: np.ndarray) -> None:
"""
Updates the thermal limits using a numpy array.

:param thermal_limit_a: np.ndarray
The new array of thermal limits (in Amperes).
"""
thermal_limit_a = np.array(thermal_limit_a).astype(dt_float)
self._thermal_limit_a = thermal_limit_a
if self.logger is not None:
self.logger.info("Thermal limits updated from vector.")

def update_limits(self, env : "grid2op.Environment.BaseEnv") -> None:
pass

def copy(self) -> Self:
"""
Creates a deep copy of the current ThermalLimits instance.

:return: ThermalLimits
A new instance with the same attributes.
"""
return copy.deepcopy(self)
9 changes: 4 additions & 5 deletions grid2op/Environment/_obsEnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ def _init_backend(
self._game_rules.initialize(self)
self._legalActClass = legalActClass

# self._action_space = self._do_nothing
self.backend.set_thermal_limit(self._thermal_limit_a)

# # self._action_space = self._do_nothing
self.ts_manager.limits = self._thermal_limit_a # old code line : self.backend.set_thermal_limit(self._thermal_limit_a)
from grid2op.Observation import ObservationSpace
from grid2op.Reward import FlatReward
ob_sp_cls = ObservationSpace.init_grid(type(backend), _local_dir_cls=self._local_dir_cls)
Expand All @@ -207,7 +207,6 @@ def _init_backend(
_local_dir_cls=self._local_dir_cls
)
self._observationClass = self._observation_space.subtype # not used anyway

# create the opponent
self._create_opponent()

Expand Down Expand Up @@ -555,4 +554,4 @@ def close(self):
]:
if hasattr(self, attr_nm):
delattr(self, attr_nm)
setattr(self, attr_nm, None)
setattr(self, attr_nm, None)
Loading