Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/bsk_rl/gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ def _generate_world_args(self) -> None:
}

def _randomize_time_limit(self) -> None:
self.time_limit = self.time_limit_generator()
time_limit = self.time_limit_generator()
time_limit = np.ceil(time_limit / self.sim_rate) * self.sim_rate
self.time_limit = time_limit

def reset(
self,
Expand Down Expand Up @@ -753,6 +755,15 @@ def step(

self.newly_dead = list(set(previous_alive) - set(self.agents))

for satellite in self.newly_dead:
for attr in [
"_timed_terminal_event_name",
"_image_event_name",
]:
event_name = getattr(satellite, attr, None)
if event_name is not None:
self.simulator.delete_event(event_name)

observation = self._get_obs()
reward = self._get_reward()
terminated = self._get_terminated()
Expand Down
33 changes: 19 additions & 14 deletions src/bsk_rl/sats/access_satellite.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,20 +572,22 @@ def _update_image_event(self, target: "Target") -> None:
data_index
]
)

def side_effect(sim):
self.logger.info(f"imaged {target}")
self.imaged += 1
self.requires_retasking = True
self.remove_imaging_line()

self.simulator.createNewEvent(
self._image_event_name,
macros.sec2nano(self.fsw.fsw_rate),
True,
[
f"self.dynamics_list['{self.name}'].storageUnit.storageUnitDataOutMsg.read()"
+ f".storedData[{data_index}] > {current_data_level}"
],
[
self._info_command(f"imaged {target}"),
self._satellite_command + ".imaged += 1",
self._satellite_command + ".requires_retasking = True",
self._satellite_command + ".remove_imaging_line()",
],
conditionFunction=lambda sim: self.dynamics.storageUnit.storageUnitDataOutMsg.read().storedData[
data_index
]
> current_data_level,
actionFunction=side_effect,
terminal=self.variable_interval,
)
else:
Expand Down Expand Up @@ -645,13 +647,16 @@ def enable_target_window(
)
if max_duration is None:
max_duration = 1e9

def side_effect(sim):
if np.isclose(sim.sim_time, next_window[1], atol=1e-9):
self.missed += 1
self.remove_imaging_line()

self.update_timed_terminal_event(
min(next_window[1], self.simulator.sim_time + max_duration),
info=f"for {target} window",
extra_actions=[
self._satellite_command + ".missed += 1",
self._satellite_command + ".remove_imaging_line()",
],
extra_actions=side_effect,
)

def task_target_for_imaging(
Expand Down
26 changes: 18 additions & 8 deletions src/bsk_rl/sats/satellite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import inspect
import logging
from abc import ABC
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Union
from weakref import proxy

import numpy as np
Expand Down Expand Up @@ -328,7 +328,10 @@ def log_warning(self, warning: Any) -> None:
self.logger.warning(f"{warning}")

def update_timed_terminal_event(
self, t_close: float, info: str = "", extra_actions: list[str] = []
self,
t_close: float,
info: str = "",
extra_actions: Optional[Union[list[str], callable]] = None,
) -> None:
"""Create a simulator event that stops the simulation a certain time.

Expand All @@ -344,16 +347,23 @@ def update_timed_terminal_event(
self._timed_terminal_event_name = valid_func_name(
f"timed_terminal_{t_close}_{self.name}"
)

def side_effect(sim):
self.logger.info(f"timed termination at {t_close:.1f} " + info)
self.requires_retasking = True
if extra_actions is not None:
if callable(extra_actions):
extra_actions(sim)
elif isinstance(extra_actions, list):
for action in extra_actions:
exec(action, {"self": sim})

self.simulator.createNewEvent(
self._timed_terminal_event_name,
macros.sec2nano(self.simulator.sim_rate),
True,
[f"self.TotalSim.CurrentNanos * {macros.NANO2SEC} >= {t_close}"],
[
self._info_command(f"timed termination at {t_close:.1f} " + info),
self._satellite_command + ".requires_retasking = True",
]
+ extra_actions,
conditionFunction=lambda sim: sim.sim_time >= t_close,
actionFunction=side_effect,
terminal=self.variable_interval,
)
self.simulator.eventMap[self._timed_terminal_event_name].eventActive = True
Expand Down
74 changes: 46 additions & 28 deletions src/bsk_rl/sim/dyn/relative_motion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Dynamics models concerning the relative motion of spacecraft."""

import numpy as np
from Basilisk.simulation import spacecraftLocation
from Basilisk.utilities import macros

Expand Down Expand Up @@ -93,28 +94,40 @@ def setup_conjunctions(self, conjunction_radius: float, **kwargs) -> None:

for sat_dyn in self.simulator.dynamics_list.values():
if sat_dyn != self and isinstance(sat_dyn, ConjunctionDynModel):

def condition(sim, sat_dyn=sat_dyn):
distance = np.linalg.norm(
np.array(self.satellite.dynamics.r_BN_N)
- np.array(sat_dyn.satellite.dynamics.r_BN_N)
)
keepout = (
self.satellite.dynamics.conjunction_radius
+ sat_dyn.satellite.dynamics.conjunction_radius
)
return distance <= keepout

def side_effect(sim):
self.satellite.logger.info(
f"collided with {sat_dyn.satellite.name}"
)
sat_dyn.satellite.logger.info(
f"collided with {self.satellite.name}"
)
self.satellite.dynamics.conjunctions.append(sat_dyn.satellite)
sat_dyn.satellite.dynamics.conjunctions.append(self.satellite)
if sim.sim_time == 0:
self.satellite.logger.warning(
"Collision occurred at t=0, may incorrectly report failure type"
)

self.simulator.createNewEvent(
valid_func_name(
f"conjunction_{self.satellite.name}_{sat_dyn.satellite.name}"
),
macros.sec2nano(self.simulator.sim_rate),
True,
[
f"np.linalg.norm(np.array({self.satellite._satellite_command}.dynamics.r_BN_N) - np.array({sat_dyn.satellite._satellite_command}.dynamics.r_BN_N))"
+ " <= "
+ f"{self.satellite._satellite_command}.dynamics.conjunction_radius + {sat_dyn.satellite._satellite_command}.dynamics.conjunction_radius"
],
[
self.satellite._info_command(
f"collided with {sat_dyn.satellite.name}"
),
sat_dyn.satellite._info_command(
f"collided with {self.satellite.name}"
),
f"{self.satellite._satellite_command}.dynamics.conjunctions.append({sat_dyn.satellite._satellite_command})",
f"{sat_dyn.satellite._satellite_command}.dynamics.conjunctions.append({self.satellite._satellite_command})",
f"[{self.satellite._satellite_command}.logger.warning('Collision occurred at t=0, may incorrectly report failure type') if self.sim_time == 0 else None]",
],
conditionFunction=condition,
actionFunction=side_effect,
terminal=True,
)

Expand Down Expand Up @@ -158,22 +171,27 @@ def setup_range(self, max_range_radius: float, chief_name: str, **kwargs) -> Non
)
return

self.chief = self.simulator.get_satellite(self.chief_name)

def condition(sim):
distance = np.linalg.norm(
np.array(self.satellite.dynamics.r_BN_N)
- np.array(self.chief.dynamics.r_BN_N)
)
return distance >= self.max_range_radius

def side_effect(sim):
self.satellite.logger.info(
f"Exceeded maximum range of {max_range_radius} m from {self.chief_name}"
)
self.out_of_ranges.append(self.chief)

self.simulator.createNewEvent(
valid_func_name(f"range_{self.satellite.name}_{self.chief_name}"),
macros.sec2nano(self.simulator.sim_rate),
True,
[
f"np.linalg.norm(np.array({self.satellite._satellite_command}.dynamics.r_BN_N)"
+ f"- np.array(self.get_satellite('{self.chief_name}').dynamics.r_BN_N))"
+ " >= "
+ f"{self.satellite._satellite_command}.dynamics.max_range_radius"
],
[
self.satellite._info_command(
f"Exceeded maximum range of {max_range_radius} m from {self.chief_name}"
),
f"{self.satellite._satellite_command}.dynamics.out_of_ranges.append(self.get_satellite('{self.chief_name}'))",
],
conditionFunction=condition,
actionFunction=side_effect,
terminal=True,
)

Expand Down
8 changes: 4 additions & 4 deletions src/bsk_rl/sim/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ def run(self) -> None:
if "max_step_duration" in self.eventMap:
self.delete_event("max_step_duration")

step_end_time = self.sim_time + self.max_step_duration

self.createNewEvent(
"max_step_duration",
mc.sec2nano(self.sim_rate),
True,
[
f"self.TotalSim.CurrentNanos * {mc.NANO2SEC} >= {self.sim_time + self.max_step_duration}"
],
["self.logger.info('Max step duration reached')"],
conditionFunction=lambda sim: sim.sim_time >= step_end_time,
actionFunction=lambda sim: sim.logger.info("Max step duration reached"),
terminal=True,
)
self.ConfigureStopTime(mc.sec2nano(min(self.time_limit, 2**31)))
Expand Down