Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from rclpy.node import Node
from std_msgs.msg import Bool
from std_srvs.srv import SetBool, Trigger
from task_planning.utils.other_utils import singleton

from task_planning.utils.other_utils import get_robot_name
from task_planning.utils.other_utils import get_robot_name, singleton

logger = get_logger('controls_interface')

Expand Down
11 changes: 5 additions & 6 deletions onboard/src/task_planning/task_planning/interface/cv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from enum import Enum
from pathlib import Path
from typing import ClassVar
import time

import numpy as np
import resource_retriever as rr
Expand All @@ -22,6 +22,8 @@ class CVObjectType(Enum):
BIN_BLUE = 'bin_blue'
BIN_RED = 'bin_red'
BIN_WHOLE = 'bin_whole'
BIN_PINK_FRONT = 'bin_pink_front'
BIN_PINK_BOTTOM = 'bin_pink_bottom'
BUOY = 'buoy'
GATE_REEF_SHARK = 'b'
GATE_SAWFISH = 'c'
Expand All @@ -36,8 +38,6 @@ class CVObjectType(Enum):
TORPEDO_LARGEST_TARGET = 'torpedo_largest_target'
TORPEDO_LOWER_TARGET = 'h'
TORPEDO_UPPER_TARGET = 'g'
BIN_PINK_FRONT = 'bin_pink_front'
BIN_PINK_BOTTOM = 'bin_pink_bottom'


@singleton
Expand All @@ -62,7 +62,6 @@ class CV:

MODELS_PATH = 'package://cv/models/depthai_models.yaml'
CV_CAMERA = 'front'
# TODO: add other CV models here as defined in depthai_models.yaml. Modify the Enum strings correspondingly.
CV_MODELS: ClassVar[list[str]] = ['2025_torpedo']

# Need to see more than TORPEDO_BANNER_RATE_THRESHOLD messages per second
Expand All @@ -72,6 +71,8 @@ class CV:
CVObjectType.BUOY: '/cv/front_usb/buoy/bounding_box',
CVObjectType.BIN_BLUE: '/cv/bottom/bin_blue/bounding_box',
CVObjectType.BIN_RED: '/cv/bottom/bin_red/bounding_box',
CVObjectType.BIN_PINK_FRONT: 'cv/front_usb/bin_pink_front/bounding_box',
CVObjectType.BIN_PINK_BOTTOM: 'cv/front_usb/bin_pink_bottom/bounding_box',
CVObjectType.LANE_MARKER: '/cv/bottom/lane_marker/bounding_box',
CVObjectType.PATH_MARKER: '/cv/bottom/path_marker/bounding_box',
CVObjectType.TORPEDO_BANNER: '/cv/front/torpedo_banner',
Expand All @@ -80,8 +81,6 @@ class CV:
CVObjectType.TORPEDO_REEF_SHARK_TARGET: '/cv/front_usb/torpedo_reef_shark_target/bounding_box',
CVObjectType.TORPEDO_SAWFISH_TARGET: '/cv/front_usb/torpedo_sawfish_target/bounding_box',
CVObjectType.TORPEDO_LARGEST_TARGET: '/cv/front_usb/torpedo_largest_target/bounding_box',
CVObjectType.BIN_PINK_FRONT: 'cv/front_usb/bin_pink_front/bounding_box',
CVObjectType.BIN_PINK_BOTTOM: 'cv/front_usb/bin_pink_bottom/bounding_box',
}

DISTANCE_TOPICS: ClassVar[dict[CVObjectType, str]] = {
Expand Down
1 change: 0 additions & 1 deletion onboard/src/task_planning/task_planning/interface/ivc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from rclpy.node import Node
from rclpy.task import Future
from rclpy.time import Time

from task_planning.utils.other_utils import singleton

logger = get_logger('ivc_interface')
Expand Down
3 changes: 1 addition & 2 deletions onboard/src/task_planning/task_planning/interface/servos.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from rclpy.logging import get_logger
from rclpy.node import Node
from rclpy.task import Future
from task_planning.utils.other_utils import singleton
from task_planning.utils.other_utils import get_robot_name
from task_planning.utils.other_utils import get_robot_name, singleton

logger = get_logger('servos_interface')

Expand Down
69 changes: 68 additions & 1 deletion onboard/src/task_planning/task_planning/interface/state.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from geometry_msgs.msg import PoseWithCovarianceStamped, Vector3, Twist
import copy
import math
from collections.abc import Callable

import numpy as np
from geometry_msgs.msg import PoseWithCovarianceStamped, Vector3
from nav_msgs.msg import Odometry
from rclpy.logging import get_logger
from rclpy.node import Node
Expand All @@ -7,6 +12,7 @@
from task_planning.utils import geometry_utils
from task_planning.utils.other_utils import singleton
from tf2_ros.buffer import Buffer
from transforms3d.euler import quat2euler

logger = get_logger('state_interface')

Expand Down Expand Up @@ -167,3 +173,64 @@ def reset_pose(self) -> None:

if not self.bypass:
self._reset_pose.call_async(request)

def get_yaw_correction(self, maximum_yaw: float = math.radians(30),
get_step_size_func: Callable[[float], float] | None = None) -> float:
"""
Calculate the yaw correction needed to return the robot to its original IMU orientation.

Args:
maximum_yaw (float, optional): The maximum allowed yaw correction in radians for a single step.
Defaults to 30 degrees.
get_step_size_func (Callable, optional): A function that takes the desired yaw correction and returns
the absolute step size to use. If None, defaults to a function limiting the correction to maximum_yaw.

Returns:
float: The computed yaw correction in radians, limited by the step size function.
"""
if not get_step_size_func:
get_step_size_func = lambda desired_yaw: min(abs(desired_yaw), maximum_yaw) # noqa: E731

orig_imu_orientation = copy.deepcopy(self._orig_imu.orientation)
orig_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(orig_imu_orientation))

cur_imu_orientation = copy.deepcopy(self._imu.orientation)
cur_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_imu_orientation))

correction = orig_imu_euler_angles[2] - cur_imu_euler_angles[2]
sign_correction = np.sign(correction)
return sign_correction * get_step_size_func(correction)

def get_gyro_yaw_correction(self, return_raw: bool = True, maximum_yaw: float = math.radians(30),
get_step_size_func: Callable[[float], float] | None = None) -> float:
"""
Calculate the yaw correction needed to return the robot to its original gyro orientation.

Args:
return_raw (bool, optional): If True, returns the raw yaw correction (difference in yaw angles).
If False, returns the processed correction normalized to [0, 2π), limited by the step size function.
Defaults to True.
maximum_yaw (float, optional): The maximum allowed yaw correction in radians for a single step.
Defaults to 30 degrees.
get_step_size_func (Callable, optional): A function that takes the desired yaw correction and returns
the absolute step size to use. If None, defaults to a function limiting the correction to maximum_yaw.

Returns:
float: The computed yaw correction in radians, either raw or processed, limited by the step size function.
"""
orig_gyro_orientation = copy.deepcopy(self._orig_gyro.pose.pose.orientation)
orig_gyro_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(orig_gyro_orientation))

cur_gyro_orientation = copy.deepcopy(self._gyro.pose.pose.orientation)
cur_gyro_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_gyro_orientation))

raw_correction = orig_gyro_euler_angles[2] - cur_gyro_euler_angles[2]

if return_raw:
return raw_correction

if not get_step_size_func:
get_step_size_func = lambda desired_yaw: min(abs(desired_yaw), maximum_yaw) # noqa: E731
correction = raw_correction % (2 * np.pi)
sign_correction = np.sign(correction)
return sign_correction * get_step_size_func(correction)
2 changes: 1 addition & 1 deletion onboard/src/task_planning/task_planning/robot/crush.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def main(self: Task) -> Task[None, None, None]:
tasks = [
######## Main competition tasks ########
ivc_tasks.delineate_ivc_log(parent=self),
comp_tasks.initial_submerge(-0.5, z_tolerance=0.15, enable_controls_flag=True, timeout=10, parent=self),
comp_tasks.initial_submerge(0.5, z_tolerance=0.15, enable_controls_flag=True, timeout=10, parent=self),
comp_tasks.coin_flip(enable_same_direction=False, parent=self),
comp_tasks.gate_task_dead_reckoning(depth_level=0.7, parent=self), # Move through gate via 2,2; right strafe via 1.5
comp_tasks.gate_style_task(depth_level=0.975, parent=self), # Spin
Expand Down
2 changes: 1 addition & 1 deletion onboard/src/task_planning/task_planning/robot/oogway.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def main(self: Task) -> Task[None, None, None]:
tasks = [
######## Main competition tasks ########
ivc_tasks.delineate_ivc_log(parent=self),
comp_tasks.initial_submerge(-DEPTH, parent=self),
comp_tasks.initial_submerge(DEPTH, parent=self),
comp_tasks.gate_task_dead_reckoning(depth_level=-DEPTH, parent=self),
comp_tasks.torpedo_task(first_target=FIRST_TARGET, depth_level=DEPTH, direction=DIRECTION_OF_TORPEDO_BANNER, parent=self),
# TODO: task not found???
Expand Down
48 changes: 42 additions & 6 deletions onboard/src/task_planning/task_planning/tasks/base_comp_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from rclpy.logging import get_logger
from task_planning.interface.cv import CVObjectType
from task_planning.interface.state import State
from task_planning.task import ReturnType, SendType, Task, YieldType
from task_planning.task import ReturnType, SendType, Task, Yield, YieldType
from task_planning.tasks import cv_tasks, move_tasks
from task_planning.utils import geometry_utils
from transforms3d.euler import quat2euler
Expand Down Expand Up @@ -71,27 +71,32 @@ async def correct_z_to_cv_obj(self, cv_target: CVObjectType, add_factor: float =
"""
await cv_tasks.correct_z(prop=cv_target, add_factor=add_factor, mult_factor=mult_factor, parent=self)

async def correct_yaw(self, yaw_correction: float,
async def correct_yaw(self, yaw_correction: float, yaw_tolerance: float = 0.15, depth_level: float | None = None,
add_factor: float = 0, mult_factor: float = 1, timeout: int = 30) -> None:
"""
Correct yaw orientation by the specified angle with optional adjustments.

Args:
yaw_correction (float): Base yaw correction in radians.
Positive values rotate counter-clockwise, negative values rotate clockwise.
yaw_tolerance (float, optional): Error margin for yaw correction in radians. Defaults to 0.15.
depth_level (float, optional): The depth, as provided by the pressure sensor, the robot should move to.
If this is not None, the Z value of the pose will be overridden. Defaults to None.
add_factor (float, optional): Additional angle to add to correction. Defaults to 0.
mult_factor (float, optional): Multiplier for correction magnitude. Defaults to 1.
timeout (int, optional): The maximum number of seconds to attempt reaching the pose
before timing out. Defaults to 30.
"""
logger.info(f'Correcting yaw by {(yaw_correction + add_factor) * mult_factor} radians')
logger.info(f'[correct_yaw] Correcting yaw by {(yaw_correction + add_factor) * mult_factor} radians')
await move_tasks.move_to_pose_local(
geometry_utils.create_pose(0, 0, 0, 0, 0, (yaw_correction + add_factor) * mult_factor),
keep_orientation=True,
parent=self,
depth_level=depth_level,
pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=yaw_tolerance),
timeout=timeout,
parent=self,
)
logger.info('Corrected yaw')
logger.info('[correct_yaw] Corrected yaw')

async def correct_roll_and_pitch(self, mult_factor: float = 1) -> None:
"""
Expand All @@ -105,10 +110,41 @@ async def correct_roll_and_pitch(self, mult_factor: float = 1) -> None:
roll_correction = -euler_angles[0] * mult_factor
pitch_correction = -euler_angles[1] * mult_factor

logger.info(f'Roll, pitch correction: {roll_correction, pitch_correction}')
logger.info(f'[correct_roll_and_pitch] Roll, pitch correction: {roll_correction, pitch_correction}')
await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, roll_correction, pitch_correction, 0),
parent=self)

async def move_with_directions(self, directions: list[tuple[int | float, int | float, int | float]],
depth_level: float | None = None, correct_yaw: bool = True,
correct_depth: bool = True, timeout: int = 30) -> None:
"""
Move the robot sequentially through a list of specified local directions with optional depth control.

Args:
directions (list[tuple[int | float, int | float, int | float]]): The directions to move the robot in.
Each tuple contains the distance to move the robot in the X, Y, and Z directions respectively.
depth_level (float, optional): The depth, as provided by the pressure sensor, the robot should move to.
If this is not None, the Z value of the provided pose will be overridden. Defaults to None.
correct_yaw (bool, optional): If True, corrects the yaw after moving to a pose. Defaults to True.
correct_depth (bool, optional): If True, corrects the depth after moving to a pose. Defaults to True.
timeout (int, optional): Maximum time in seconds to spend on each directional move. Defaults to 60.
"""
await move_tasks.move_with_directions(directions, depth_level, correct_yaw=correct_yaw,
correct_depth=correct_depth, keep_orientation=True,
timeout=timeout, parent=self)

async def stabilize(self) -> None:
"""Stabilize the robot by publishing the current pose as the desired pose."""
logger.info('[stabilize] Start stabilizing...')

hold_task = move_tasks.hold_position(parent=self)
while True:
if hold_task.step():
break
await Yield()

logger.info('[stabilize] Finished stabilize task.')


def comp_task[YieldType, SendType, ReturnType](func: Callable[..., Coroutine[YieldType, SendType, ReturnType]]) -> \
Callable[..., CompTask[YieldType, SendType, ReturnType]]:
Expand Down
Loading
Loading