diff --git a/src/bsk_rl/gym.py b/src/bsk_rl/gym.py index 8009cd20..c2f34a37 100644 --- a/src/bsk_rl/gym.py +++ b/src/bsk_rl/gym.py @@ -251,7 +251,9 @@ def _generate_world_args(self) -> None: } def _randomize_time_limit(self) -> None: - self.time_limit = self.time_limit_generator() + time_limit = self.time_limit_generator() + time_limit = np.ceil(time_limit / self.sim_rate) * self.sim_rate + self.time_limit = time_limit def reset( self, @@ -753,6 +755,15 @@ def step( self.newly_dead = list(set(previous_alive) - set(self.agents)) + for satellite in self.newly_dead: + for attr in [ + "_timed_terminal_event_name", + "_image_event_name", + ]: + event_name = getattr(satellite, attr, None) + if event_name is not None: + self.simulator.delete_event(event_name) + observation = self._get_obs() reward = self._get_reward() terminated = self._get_terminated() diff --git a/src/bsk_rl/sats/access_satellite.py b/src/bsk_rl/sats/access_satellite.py index 9cd342a1..782f1ad9 100644 --- a/src/bsk_rl/sats/access_satellite.py +++ b/src/bsk_rl/sats/access_satellite.py @@ -572,20 +572,22 @@ def _update_image_event(self, target: "Target") -> None: data_index ] ) + + def side_effect(sim): + self.logger.info(f"imaged {target}") + self.imaged += 1 + self.requires_retasking = True + self.remove_imaging_line() + self.simulator.createNewEvent( self._image_event_name, macros.sec2nano(self.fsw.fsw_rate), True, - [ - f"self.dynamics_list['{self.name}'].storageUnit.storageUnitDataOutMsg.read()" - + f".storedData[{data_index}] > {current_data_level}" - ], - [ - self._info_command(f"imaged {target}"), - self._satellite_command + ".imaged += 1", - self._satellite_command + ".requires_retasking = True", - self._satellite_command + ".remove_imaging_line()", - ], + conditionFunction=lambda sim: self.dynamics.storageUnit.storageUnitDataOutMsg.read().storedData[ + data_index + ] + > current_data_level, + actionFunction=side_effect, terminal=self.variable_interval, ) else: @@ -645,13 +647,16 @@ def enable_target_window( ) if max_duration is None: max_duration = 1e9 + + def side_effect(sim): + if np.isclose(sim.sim_time, next_window[1], atol=1e-9): + self.missed += 1 + self.remove_imaging_line() + self.update_timed_terminal_event( min(next_window[1], self.simulator.sim_time + max_duration), info=f"for {target} window", - extra_actions=[ - self._satellite_command + ".missed += 1", - self._satellite_command + ".remove_imaging_line()", - ], + extra_actions=side_effect, ) def task_target_for_imaging( diff --git a/src/bsk_rl/sats/satellite.py b/src/bsk_rl/sats/satellite.py index c0d206a0..0c5f827c 100644 --- a/src/bsk_rl/sats/satellite.py +++ b/src/bsk_rl/sats/satellite.py @@ -3,7 +3,7 @@ import inspect import logging from abc import ABC -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Optional, Union from weakref import proxy import numpy as np @@ -328,7 +328,10 @@ def log_warning(self, warning: Any) -> None: self.logger.warning(f"{warning}") def update_timed_terminal_event( - self, t_close: float, info: str = "", extra_actions: list[str] = [] + self, + t_close: float, + info: str = "", + extra_actions: Optional[Union[list[str], callable]] = None, ) -> None: """Create a simulator event that stops the simulation a certain time. @@ -344,16 +347,23 @@ def update_timed_terminal_event( self._timed_terminal_event_name = valid_func_name( f"timed_terminal_{t_close}_{self.name}" ) + + def side_effect(sim): + self.logger.info(f"timed termination at {t_close:.1f} " + info) + self.requires_retasking = True + if extra_actions is not None: + if callable(extra_actions): + extra_actions(sim) + elif isinstance(extra_actions, list): + for action in extra_actions: + exec(action, {"self": sim}) + self.simulator.createNewEvent( self._timed_terminal_event_name, macros.sec2nano(self.simulator.sim_rate), True, - [f"self.TotalSim.CurrentNanos * {macros.NANO2SEC} >= {t_close}"], - [ - self._info_command(f"timed termination at {t_close:.1f} " + info), - self._satellite_command + ".requires_retasking = True", - ] - + extra_actions, + conditionFunction=lambda sim: sim.sim_time >= t_close, + actionFunction=side_effect, terminal=self.variable_interval, ) self.simulator.eventMap[self._timed_terminal_event_name].eventActive = True diff --git a/src/bsk_rl/sim/dyn/relative_motion.py b/src/bsk_rl/sim/dyn/relative_motion.py index 1e8d3f6e..bd3a7bc5 100644 --- a/src/bsk_rl/sim/dyn/relative_motion.py +++ b/src/bsk_rl/sim/dyn/relative_motion.py @@ -1,5 +1,6 @@ """Dynamics models concerning the relative motion of spacecraft.""" +import numpy as np from Basilisk.simulation import spacecraftLocation from Basilisk.utilities import macros @@ -93,28 +94,40 @@ def setup_conjunctions(self, conjunction_radius: float, **kwargs) -> None: for sat_dyn in self.simulator.dynamics_list.values(): if sat_dyn != self and isinstance(sat_dyn, ConjunctionDynModel): + + def condition(sim, sat_dyn=sat_dyn): + distance = np.linalg.norm( + np.array(self.satellite.dynamics.r_BN_N) + - np.array(sat_dyn.satellite.dynamics.r_BN_N) + ) + keepout = ( + self.satellite.dynamics.conjunction_radius + + sat_dyn.satellite.dynamics.conjunction_radius + ) + return distance <= keepout + + def side_effect(sim): + self.satellite.logger.info( + f"collided with {sat_dyn.satellite.name}" + ) + sat_dyn.satellite.logger.info( + f"collided with {self.satellite.name}" + ) + self.satellite.dynamics.conjunctions.append(sat_dyn.satellite) + sat_dyn.satellite.dynamics.conjunctions.append(self.satellite) + if sim.sim_time == 0: + self.satellite.logger.warning( + "Collision occurred at t=0, may incorrectly report failure type" + ) + self.simulator.createNewEvent( valid_func_name( f"conjunction_{self.satellite.name}_{sat_dyn.satellite.name}" ), macros.sec2nano(self.simulator.sim_rate), True, - [ - f"np.linalg.norm(np.array({self.satellite._satellite_command}.dynamics.r_BN_N) - np.array({sat_dyn.satellite._satellite_command}.dynamics.r_BN_N))" - + " <= " - + f"{self.satellite._satellite_command}.dynamics.conjunction_radius + {sat_dyn.satellite._satellite_command}.dynamics.conjunction_radius" - ], - [ - self.satellite._info_command( - f"collided with {sat_dyn.satellite.name}" - ), - sat_dyn.satellite._info_command( - f"collided with {self.satellite.name}" - ), - f"{self.satellite._satellite_command}.dynamics.conjunctions.append({sat_dyn.satellite._satellite_command})", - f"{sat_dyn.satellite._satellite_command}.dynamics.conjunctions.append({self.satellite._satellite_command})", - f"[{self.satellite._satellite_command}.logger.warning('Collision occurred at t=0, may incorrectly report failure type') if self.sim_time == 0 else None]", - ], + conditionFunction=condition, + actionFunction=side_effect, terminal=True, ) @@ -158,22 +171,27 @@ def setup_range(self, max_range_radius: float, chief_name: str, **kwargs) -> Non ) return + self.chief = self.simulator.get_satellite(self.chief_name) + + def condition(sim): + distance = np.linalg.norm( + np.array(self.satellite.dynamics.r_BN_N) + - np.array(self.chief.dynamics.r_BN_N) + ) + return distance >= self.max_range_radius + + def side_effect(sim): + self.satellite.logger.info( + f"Exceeded maximum range of {max_range_radius} m from {self.chief_name}" + ) + self.out_of_ranges.append(self.chief) + self.simulator.createNewEvent( valid_func_name(f"range_{self.satellite.name}_{self.chief_name}"), macros.sec2nano(self.simulator.sim_rate), True, - [ - f"np.linalg.norm(np.array({self.satellite._satellite_command}.dynamics.r_BN_N)" - + f"- np.array(self.get_satellite('{self.chief_name}').dynamics.r_BN_N))" - + " >= " - + f"{self.satellite._satellite_command}.dynamics.max_range_radius" - ], - [ - self.satellite._info_command( - f"Exceeded maximum range of {max_range_radius} m from {self.chief_name}" - ), - f"{self.satellite._satellite_command}.dynamics.out_of_ranges.append(self.get_satellite('{self.chief_name}'))", - ], + conditionFunction=condition, + actionFunction=side_effect, terminal=True, ) diff --git a/src/bsk_rl/sim/simulator.py b/src/bsk_rl/sim/simulator.py index 18b6fb2d..363995b9 100644 --- a/src/bsk_rl/sim/simulator.py +++ b/src/bsk_rl/sim/simulator.py @@ -138,14 +138,14 @@ def run(self) -> None: if "max_step_duration" in self.eventMap: self.delete_event("max_step_duration") + step_end_time = self.sim_time + self.max_step_duration + self.createNewEvent( "max_step_duration", mc.sec2nano(self.sim_rate), True, - [ - f"self.TotalSim.CurrentNanos * {mc.NANO2SEC} >= {self.sim_time + self.max_step_duration}" - ], - ["self.logger.info('Max step duration reached')"], + conditionFunction=lambda sim: sim.sim_time >= step_end_time, + actionFunction=lambda sim: sim.logger.info("Max step duration reached"), terminal=True, ) self.ConfigureStopTime(mc.sec2nano(min(self.time_limit, 2**31)))