diff --git a/onboard/src/task_planning/task_planning/interface/controls.py b/onboard/src/task_planning/task_planning/interface/controls.py index 852c8f56..ce85310f 100644 --- a/onboard/src/task_planning/task_planning/interface/controls.py +++ b/onboard/src/task_planning/task_planning/interface/controls.py @@ -9,9 +9,7 @@ from rclpy.node import Node from std_msgs.msg import Bool from std_srvs.srv import SetBool, Trigger -from task_planning.utils.other_utils import singleton - -from task_planning.utils.other_utils import get_robot_name +from task_planning.utils.other_utils import get_robot_name, singleton logger = get_logger('controls_interface') diff --git a/onboard/src/task_planning/task_planning/interface/cv.py b/onboard/src/task_planning/task_planning/interface/cv.py index c7be13c4..4b763586 100644 --- a/onboard/src/task_planning/task_planning/interface/cv.py +++ b/onboard/src/task_planning/task_planning/interface/cv.py @@ -1,7 +1,7 @@ +import time from enum import Enum from pathlib import Path from typing import ClassVar -import time import numpy as np import resource_retriever as rr @@ -22,6 +22,8 @@ class CVObjectType(Enum): BIN_BLUE = 'bin_blue' BIN_RED = 'bin_red' BIN_WHOLE = 'bin_whole' + BIN_PINK_FRONT = 'bin_pink_front' + BIN_PINK_BOTTOM = 'bin_pink_bottom' BUOY = 'buoy' GATE_REEF_SHARK = 'b' GATE_SAWFISH = 'c' @@ -36,8 +38,6 @@ class CVObjectType(Enum): TORPEDO_LARGEST_TARGET = 'torpedo_largest_target' TORPEDO_LOWER_TARGET = 'h' TORPEDO_UPPER_TARGET = 'g' - BIN_PINK_FRONT = 'bin_pink_front' - BIN_PINK_BOTTOM = 'bin_pink_bottom' @singleton @@ -62,7 +62,6 @@ class CV: MODELS_PATH = 'package://cv/models/depthai_models.yaml' CV_CAMERA = 'front' - # TODO: add other CV models here as defined in depthai_models.yaml. Modify the Enum strings correspondingly. CV_MODELS: ClassVar[list[str]] = ['2025_torpedo'] # Need to see more than TORPEDO_BANNER_RATE_THRESHOLD messages per second @@ -72,6 +71,8 @@ class CV: CVObjectType.BUOY: '/cv/front_usb/buoy/bounding_box', CVObjectType.BIN_BLUE: '/cv/bottom/bin_blue/bounding_box', CVObjectType.BIN_RED: '/cv/bottom/bin_red/bounding_box', + CVObjectType.BIN_PINK_FRONT: 'cv/front_usb/bin_pink_front/bounding_box', + CVObjectType.BIN_PINK_BOTTOM: 'cv/front_usb/bin_pink_bottom/bounding_box', CVObjectType.LANE_MARKER: '/cv/bottom/lane_marker/bounding_box', CVObjectType.PATH_MARKER: '/cv/bottom/path_marker/bounding_box', CVObjectType.TORPEDO_BANNER: '/cv/front/torpedo_banner', @@ -80,8 +81,6 @@ class CV: CVObjectType.TORPEDO_REEF_SHARK_TARGET: '/cv/front_usb/torpedo_reef_shark_target/bounding_box', CVObjectType.TORPEDO_SAWFISH_TARGET: '/cv/front_usb/torpedo_sawfish_target/bounding_box', CVObjectType.TORPEDO_LARGEST_TARGET: '/cv/front_usb/torpedo_largest_target/bounding_box', - CVObjectType.BIN_PINK_FRONT: 'cv/front_usb/bin_pink_front/bounding_box', - CVObjectType.BIN_PINK_BOTTOM: 'cv/front_usb/bin_pink_bottom/bounding_box', } DISTANCE_TOPICS: ClassVar[dict[CVObjectType, str]] = { diff --git a/onboard/src/task_planning/task_planning/interface/ivc.py b/onboard/src/task_planning/task_planning/interface/ivc.py index 5b695aab..999d6005 100644 --- a/onboard/src/task_planning/task_planning/interface/ivc.py +++ b/onboard/src/task_planning/task_planning/interface/ivc.py @@ -9,7 +9,6 @@ from rclpy.node import Node from rclpy.task import Future from rclpy.time import Time - from task_planning.utils.other_utils import singleton logger = get_logger('ivc_interface') diff --git a/onboard/src/task_planning/task_planning/interface/servos.py b/onboard/src/task_planning/task_planning/interface/servos.py index 893dc044..2393e344 100644 --- a/onboard/src/task_planning/task_planning/interface/servos.py +++ b/onboard/src/task_planning/task_planning/interface/servos.py @@ -7,8 +7,7 @@ from rclpy.logging import get_logger from rclpy.node import Node from rclpy.task import Future -from task_planning.utils.other_utils import singleton -from task_planning.utils.other_utils import get_robot_name +from task_planning.utils.other_utils import get_robot_name, singleton logger = get_logger('servos_interface') diff --git a/onboard/src/task_planning/task_planning/interface/state.py b/onboard/src/task_planning/task_planning/interface/state.py index 1860d950..47178d9f 100644 --- a/onboard/src/task_planning/task_planning/interface/state.py +++ b/onboard/src/task_planning/task_planning/interface/state.py @@ -1,4 +1,9 @@ -from geometry_msgs.msg import PoseWithCovarianceStamped, Vector3, Twist +import copy +import math +from collections.abc import Callable + +import numpy as np +from geometry_msgs.msg import PoseWithCovarianceStamped, Vector3 from nav_msgs.msg import Odometry from rclpy.logging import get_logger from rclpy.node import Node @@ -7,6 +12,7 @@ from task_planning.utils import geometry_utils from task_planning.utils.other_utils import singleton from tf2_ros.buffer import Buffer +from transforms3d.euler import quat2euler logger = get_logger('state_interface') @@ -167,3 +173,64 @@ def reset_pose(self) -> None: if not self.bypass: self._reset_pose.call_async(request) + + def get_yaw_correction(self, maximum_yaw: float = math.radians(30), + get_step_size_func: Callable[[float], float] | None = None) -> float: + """ + Calculate the yaw correction needed to return the robot to its original IMU orientation. + + Args: + maximum_yaw (float, optional): The maximum allowed yaw correction in radians for a single step. + Defaults to 30 degrees. + get_step_size_func (Callable, optional): A function that takes the desired yaw correction and returns + the absolute step size to use. If None, defaults to a function limiting the correction to maximum_yaw. + + Returns: + float: The computed yaw correction in radians, limited by the step size function. + """ + if not get_step_size_func: + get_step_size_func = lambda desired_yaw: min(abs(desired_yaw), maximum_yaw) # noqa: E731 + + orig_imu_orientation = copy.deepcopy(self._orig_imu.orientation) + orig_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(orig_imu_orientation)) + + cur_imu_orientation = copy.deepcopy(self._imu.orientation) + cur_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_imu_orientation)) + + correction = orig_imu_euler_angles[2] - cur_imu_euler_angles[2] + sign_correction = np.sign(correction) + return sign_correction * get_step_size_func(correction) + + def get_gyro_yaw_correction(self, return_raw: bool = True, maximum_yaw: float = math.radians(30), + get_step_size_func: Callable[[float], float] | None = None) -> float: + """ + Calculate the yaw correction needed to return the robot to its original gyro orientation. + + Args: + return_raw (bool, optional): If True, returns the raw yaw correction (difference in yaw angles). + If False, returns the processed correction normalized to [0, 2π), limited by the step size function. + Defaults to True. + maximum_yaw (float, optional): The maximum allowed yaw correction in radians for a single step. + Defaults to 30 degrees. + get_step_size_func (Callable, optional): A function that takes the desired yaw correction and returns + the absolute step size to use. If None, defaults to a function limiting the correction to maximum_yaw. + + Returns: + float: The computed yaw correction in radians, either raw or processed, limited by the step size function. + """ + orig_gyro_orientation = copy.deepcopy(self._orig_gyro.pose.pose.orientation) + orig_gyro_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(orig_gyro_orientation)) + + cur_gyro_orientation = copy.deepcopy(self._gyro.pose.pose.orientation) + cur_gyro_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_gyro_orientation)) + + raw_correction = orig_gyro_euler_angles[2] - cur_gyro_euler_angles[2] + + if return_raw: + return raw_correction + + if not get_step_size_func: + get_step_size_func = lambda desired_yaw: min(abs(desired_yaw), maximum_yaw) # noqa: E731 + correction = raw_correction % (2 * np.pi) + sign_correction = np.sign(correction) + return sign_correction * get_step_size_func(correction) diff --git a/onboard/src/task_planning/task_planning/robot/crush.py b/onboard/src/task_planning/task_planning/robot/crush.py index 63fea315..e8a92d6c 100644 --- a/onboard/src/task_planning/task_planning/robot/crush.py +++ b/onboard/src/task_planning/task_planning/robot/crush.py @@ -15,7 +15,7 @@ async def main(self: Task) -> Task[None, None, None]: tasks = [ ######## Main competition tasks ######## ivc_tasks.delineate_ivc_log(parent=self), - comp_tasks.initial_submerge(-0.5, z_tolerance=0.15, enable_controls_flag=True, timeout=10, parent=self), + comp_tasks.initial_submerge(0.5, z_tolerance=0.15, enable_controls_flag=True, timeout=10, parent=self), comp_tasks.coin_flip(enable_same_direction=False, parent=self), comp_tasks.gate_task_dead_reckoning(depth_level=0.7, parent=self), # Move through gate via 2,2; right strafe via 1.5 comp_tasks.gate_style_task(depth_level=0.975, parent=self), # Spin diff --git a/onboard/src/task_planning/task_planning/robot/oogway.py b/onboard/src/task_planning/task_planning/robot/oogway.py index f3872ed8..052a2d70 100644 --- a/onboard/src/task_planning/task_planning/robot/oogway.py +++ b/onboard/src/task_planning/task_planning/robot/oogway.py @@ -19,7 +19,7 @@ async def main(self: Task) -> Task[None, None, None]: tasks = [ ######## Main competition tasks ######## ivc_tasks.delineate_ivc_log(parent=self), - comp_tasks.initial_submerge(-DEPTH, parent=self), + comp_tasks.initial_submerge(DEPTH, parent=self), comp_tasks.gate_task_dead_reckoning(depth_level=-DEPTH, parent=self), comp_tasks.torpedo_task(first_target=FIRST_TARGET, depth_level=DEPTH, direction=DIRECTION_OF_TORPEDO_BANNER, parent=self), # TODO: task not found??? diff --git a/onboard/src/task_planning/task_planning/tasks/base_comp_task.py b/onboard/src/task_planning/task_planning/tasks/base_comp_task.py index 47eefab0..e16efbe3 100644 --- a/onboard/src/task_planning/task_planning/tasks/base_comp_task.py +++ b/onboard/src/task_planning/task_planning/tasks/base_comp_task.py @@ -4,7 +4,7 @@ from rclpy.logging import get_logger from task_planning.interface.cv import CVObjectType from task_planning.interface.state import State -from task_planning.task import ReturnType, SendType, Task, YieldType +from task_planning.task import ReturnType, SendType, Task, Yield, YieldType from task_planning.tasks import cv_tasks, move_tasks from task_planning.utils import geometry_utils from transforms3d.euler import quat2euler @@ -71,7 +71,7 @@ async def correct_z_to_cv_obj(self, cv_target: CVObjectType, add_factor: float = """ await cv_tasks.correct_z(prop=cv_target, add_factor=add_factor, mult_factor=mult_factor, parent=self) - async def correct_yaw(self, yaw_correction: float, + async def correct_yaw(self, yaw_correction: float, yaw_tolerance: float = 0.15, depth_level: float | None = None, add_factor: float = 0, mult_factor: float = 1, timeout: int = 30) -> None: """ Correct yaw orientation by the specified angle with optional adjustments. @@ -79,19 +79,24 @@ async def correct_yaw(self, yaw_correction: float, Args: yaw_correction (float): Base yaw correction in radians. Positive values rotate counter-clockwise, negative values rotate clockwise. + yaw_tolerance (float, optional): Error margin for yaw correction in radians. Defaults to 0.15. + depth_level (float, optional): The depth, as provided by the pressure sensor, the robot should move to. + If this is not None, the Z value of the pose will be overridden. Defaults to None. add_factor (float, optional): Additional angle to add to correction. Defaults to 0. mult_factor (float, optional): Multiplier for correction magnitude. Defaults to 1. timeout (int, optional): The maximum number of seconds to attempt reaching the pose before timing out. Defaults to 30. """ - logger.info(f'Correcting yaw by {(yaw_correction + add_factor) * mult_factor} radians') + logger.info(f'[correct_yaw] Correcting yaw by {(yaw_correction + add_factor) * mult_factor} radians') await move_tasks.move_to_pose_local( geometry_utils.create_pose(0, 0, 0, 0, 0, (yaw_correction + add_factor) * mult_factor), keep_orientation=True, - parent=self, + depth_level=depth_level, + pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=yaw_tolerance), timeout=timeout, + parent=self, ) - logger.info('Corrected yaw') + logger.info('[correct_yaw] Corrected yaw') async def correct_roll_and_pitch(self, mult_factor: float = 1) -> None: """ @@ -105,10 +110,41 @@ async def correct_roll_and_pitch(self, mult_factor: float = 1) -> None: roll_correction = -euler_angles[0] * mult_factor pitch_correction = -euler_angles[1] * mult_factor - logger.info(f'Roll, pitch correction: {roll_correction, pitch_correction}') + logger.info(f'[correct_roll_and_pitch] Roll, pitch correction: {roll_correction, pitch_correction}') await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, roll_correction, pitch_correction, 0), parent=self) + async def move_with_directions(self, directions: list[tuple[int | float, int | float, int | float]], + depth_level: float | None = None, correct_yaw: bool = True, + correct_depth: bool = True, timeout: int = 30) -> None: + """ + Move the robot sequentially through a list of specified local directions with optional depth control. + + Args: + directions (list[tuple[int | float, int | float, int | float]]): The directions to move the robot in. + Each tuple contains the distance to move the robot in the X, Y, and Z directions respectively. + depth_level (float, optional): The depth, as provided by the pressure sensor, the robot should move to. + If this is not None, the Z value of the provided pose will be overridden. Defaults to None. + correct_yaw (bool, optional): If True, corrects the yaw after moving to a pose. Defaults to True. + correct_depth (bool, optional): If True, corrects the depth after moving to a pose. Defaults to True. + timeout (int, optional): Maximum time in seconds to spend on each directional move. Defaults to 60. + """ + await move_tasks.move_with_directions(directions, depth_level, correct_yaw=correct_yaw, + correct_depth=correct_depth, keep_orientation=True, + timeout=timeout, parent=self) + + async def stabilize(self) -> None: + """Stabilize the robot by publishing the current pose as the desired pose.""" + logger.info('[stabilize] Start stabilizing...') + + hold_task = move_tasks.hold_position(parent=self) + while True: + if hold_task.step(): + break + await Yield() + + logger.info('[stabilize] Finished stabilize task.') + def comp_task[YieldType, SendType, ReturnType](func: Callable[..., Coroutine[YieldType, SendType, ReturnType]]) -> \ Callable[..., CompTask[YieldType, SendType, ReturnType]]: diff --git a/onboard/src/task_planning/task_planning/tasks/comp_tasks.py b/onboard/src/task_planning/task_planning/tasks/comp_tasks.py index 1063af66..2b987b6f 100644 --- a/onboard/src/task_planning/task_planning/tasks/comp_tasks.py +++ b/onboard/src/task_planning/task_planning/tasks/comp_tasks.py @@ -1,10 +1,7 @@ # ruff: noqa -import copy import math -from collections.abc import Coroutine from enum import Enum -from typing import Literal import numpy as np from custom_msgs.msg import ControlTypes @@ -12,58 +9,181 @@ from rclpy.clock import Clock from rclpy.duration import Duration from rclpy.logging import get_logger -from transforms3d.euler import quat2euler from task_planning.interface.controls import Controls from task_planning.interface.cv import CV, CVObjectType from task_planning.interface.servos import Servos, MarkerDropperStates, TorpedoStates from task_planning.interface.state import State -from task_planning.interface.ivc import IVC, IVCMessageType from task_planning.interface.sonar import Sonar -from task_planning.task import Task, Yield, task -from task_planning.tasks import cv_tasks, move_tasks, util_tasks, ivc_tasks +from task_planning.task import Task, Yield +from task_planning.tasks import move_tasks, util_tasks +from task_planning.tasks.base_comp_task import CompTask, comp_task from task_planning.utils import geometry_utils from rclpy.clock import Clock from task_planning.utils.other_utils import get_robot_name, RobotName -# TODO: move stablize() to move_tasks.py -# -# TODO: create a common skeleton @task class/interface with all the common functions to remove redundancy: -# - move_x -# - move_y -# - move_z -# - correct_x -# - correct_y -# - correct_z -# - correct_yaw -# - correct_roll_and_pitch -# - get_yaw_correction -# - ... -# These implementations can be overridden by the tasks that uses that interface. -# + # TODO: look into creating common higher level routines: -# - yaw_to_cv_object +# - move_until_object_detection (e.g. path_marker_to_marker_dropper_bins) # - spiral search (e.g. spiral_bin_search) # - logarithmic search (stretch) -# - track and align with object center for bottom camera (e.g. search_for_bins & center_path_marker) -# - track and move toward CV object (e.g. move_to_pink_bins & move_to_buoy) +# - track and align with object center for bottom camera (e.g. search_for_bins & center_path_marker & track_bin) +# - track and move toward CV object (e.g. move_to_pink_bins & move_to_buoy & move_to_torpedo) # - takes in the termination condition function as a parameter # - can improve on cv_tasks.move_to_cv_obj implementation (or replace it completely) -# TODO: rewrite tasks using base comp tasks # TODO: implement higher-level routines # TODO: refactor sonar_tasks # TODO: refactor ivc_tasks logger = get_logger('comp_tasks') -@task -async def gate_style_task(self: Task, depth_level=0.9) -> Task[None, None, None]: + +@comp_task +async def initial_submerge(self: CompTask, depth_level: float, z_tolerance: float = 0.1, + enable_controls_flag: bool = False, timeout: int = 30) -> Task[None, None, None]: + """ + Submerge the robot a given amount. + + Args: + depth_level: The distance to submerge the robot in meters. + enable_controls_flag: Flag to wait for ENABLE_CONTROLS status when true. + """ + logger.info("[initial_submerge] Starting initial submerge") + + while enable_controls_flag and not Controls().enable_controls_status.data: + await Yield() + + await move_tasks.move_to_pose_local( + geometry_utils.create_pose(0, 0, -depth_level, 0, 0, 0), + keep_orientation=True, + pose_tolerances=move_tasks.create_twist_tolerance(linear_z=z_tolerance), + timeout=timeout, + parent=self, + ) + logger.info(f'[initial_submerge] Submerged {depth_level} meters') + + +@comp_task +async def coin_flip(self: CompTask, depth_level: float = 0.7, + enable_same_direction: bool = True, timeout: int = 15) -> Task[None, None, None]: + """ + Perform the coin flip task, adjusting the robot's yaw and depth. + + The coin flip task involves correcting the robot's yaw to return it to its original orientation + and then adjusting its depth. The task continuously calculates yaw corrections based on the difference + between the current and original orientations, making incremental adjustments until the yaw is within + a specified threshold. After correcting yaw, the robot adjusts its depth to reach the desired level. + + Args: + self (CompTask): The task instance managing the execution of the coin flip task. + depth_level (float): The depth adjustment level relative to the robot's original depth. Default is 0.7. + + Returns: + Task[None, None, None]: The result of the task execution. + + Detailed Process: + 1. Calculate the desired yaw correction using the difference between the original and current IMU orientations. + 2. Gradually adjust yaw in steps, ensuring the correction does not exceed the maximum allowed yaw change. + 3. Once the yaw is corrected to within 5 degrees, adjust the robot's depth to the specified level. + 4. Log each step of the process for debugging and traceability. + + Logging: + - Logs the initial start of the coin flip task. + - Logs intermediate yaw corrections and desired yaw adjustments. + - Logs depth corrections and the final completion of the task. + + Example: + >>> await coin_flip(task_instance, depth_level=0.5) + + Notes: + - Uses `State` to access robot's current and original states, including depth and IMU orientation. + - Uses `geometry_utils` to create poses for yaw and depth corrections. + - The task continuously loops until the yaw correction is within the specified threshold (±5 degrees). + """ + DEPTH_LEVEL = State().orig_depth - depth_level + + if enable_same_direction: + while abs(State().get_gyro_yaw_correction(return_raw=True)) > math.radians(5): + yaw_correction = State().get_gyro_yaw_correction(return_raw=False, maximum_yaw=2*np.pi) + logger.info(f'[coin_flip] Yaw correction: {yaw_correction}') + + if yaw_correction > np.pi: + yaw_correction -= np.pi + await self.correct_yaw(np.pi, timeout=timeout) + logger.info('[coin_flip] Yaw correct 180') + + logger.info(f'[coin_flip] Yaw correct remainder: {yaw_correction}') + await self.correct_yaw(yaw_correction, timeout=timeout) + + else: + while abs(State().get_gyro_yaw_correction(return_raw=True)) > math.radians(5): + yaw_correction = State().get_gyro_yaw_correction(return_raw=False, maximum_yaw=2*np.pi) + logger.info(f'[coin_flip] Yaw correction: {yaw_correction}') + + await self.correct_yaw(yaw_correction, timeout=timeout) + + logger.info(f'[coin_flip] Final yaw offset: {State().get_gyro_yaw_correction(return_raw=True)}') + + await self.correct_depth(DEPTH_LEVEL) + logger.info('[coin_flip] Completed coin flip') + + +@comp_task +async def gate_task(self: CompTask, offset: int = 0, direction: int = 1) -> Task[None, None, None]: + """ + NOTE: This code is assuming we choose the sawfish side of the gate. + """ + logger.info('[gate_task] Started gate task') + + DEPTH_LEVEL = State().orig_depth - 0.7 + + def get_step_size(dist: float) -> float: + dist_threshold = 4 + if dist > dist_threshold: + return 1 + return max(dist - 2.75, 0.25) + + await util_tasks.sleep(2, parent=self) + + gate_dist = CV().bounding_boxes[CVObjectType.GATE_SAWFISH].coords.x + # await self.correct_y_to_cv_obj(CVObjectType.GATE_SAWFISH, add_factor=0.2 + offset, mult_factor=0.5) + await self.correct_depth(DEPTH_LEVEL) + + num_corrections = 0 + while gate_dist > 3: + await self.move_x(step=get_step_size(gate_dist)) + + await yaw_to_cv_object(CVObjectType.GATE_SAWFISH, direction=-1, yaw_threshold=math.radians(10), + latency_threshold=2, depth_level=0.6, parent=self), + # await self.correct_y_to_cv_obj(CVObjectType.GATE_SAWFISH, add_factor=0.2 + offset, + # mult_factor=(0.5 if num_corrections < 0 else 1)) + await self.correct_depth(DEPTH_LEVEL) + + await Yield() + gate_dist = CV().bounding_boxes[CVObjectType.GATE_SAWFISH].coords.x + logger.info(f'[gate_task] Gate dist: {gate_dist}') + + num_corrections += 1 + + directions = [ + (2, 0, 0), + (0, 0.2 * direction, 0), + (2, 0, 0), + (1, 0, 0), + ] + + await self.move_with_directions(directions, correct_yaw=False) + logger.info('[gate_task] Moved through gate') + + +@comp_task +async def gate_style_task(self: CompTask, depth_level: float = 0.9) -> Task[None, None, None]: """ Complete two full barrel rolls. """ - logger.info('Started gate style task') + logger.info('[gate_style_task] Started gate style task') DEPTH_LEVEL = State().orig_depth - depth_level @@ -71,67 +191,215 @@ async def roll(): power = Twist() power.angular.x = 1.0 Controls().publish_desired_power(power) - logger.info('Published roll power') + logger.info('[gate_style_task] Published roll power') if get_robot_name() == RobotName.OOGWAY: await util_tasks.sleep(2.25, parent=self) else: await util_tasks.sleep(1.40, parent=self) - logger.info('Completed roll') + logger.info('[gate_style_task] Completed roll') Controls().publish_desired_power(Twist()) - logger.info('Published zero power') + logger.info('[gate_style_task] Published zero power') await util_tasks.sleep(2, parent=self) + logger.info('[gate_style_task] Completed zero') - logger.info('Completed zero') - - await move_tasks.depth_correction(DEPTH_LEVEL, parent=self) + await self.correct_depth(DEPTH_LEVEL) await roll() State().reset_pose() await util_tasks.sleep(2.5, parent=self) - await move_tasks.depth_correction(DEPTH_LEVEL, parent=self) + await self.correct_depth(DEPTH_LEVEL) await roll() State().reset_pose() await util_tasks.sleep(2.5, parent=self) - await move_tasks.depth_correction(DEPTH_LEVEL, parent=self) + await self.correct_depth(DEPTH_LEVEL) await util_tasks.sleep(2.5, parent=self) - imu_orientation = State().imu.orientation - euler_angles = quat2euler([imu_orientation.w, imu_orientation.x, imu_orientation.y, imu_orientation.z]) - roll_correction = -euler_angles[0] - pitch_correction = -euler_angles[1] + await self.correct_roll_and_pitch() + logger.info('[gate_style_task] Reset orientation') - logger.info(f'Roll, pitch, yaw correction: {roll_correction, pitch_correction}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, roll_correction, pitch_correction, 0), - parent=self) - #State().reset_pose() - logger.info('Reset orientation') -@task -async def buoy_task(self: Task, turn_to_face_buoy: bool = False, depth: float = 0.7) -> Task[None, None, None]: - """Circumnavigate the buoy. Requires robot to have submerged 0.5 meters.""" - logger.info('Starting buoy task') +@comp_task +async def gate_task_dead_reckoning(self: CompTask, depth_level=0.7) -> Task[None, None, None]: + logger.info('[gate_task_dead_reckoning] Started gate task') - DEPTH_LEVEL = State().orig_depth - depth + DEPTH_LEVEL = State().orig_depth - depth_level - async def correct_y() -> Coroutine[None, None, None]: - await cv_tasks.correct_y(prop=CVObjectType.BUOY, mult_factor=0.4, parent=self) + directions = [] + if get_robot_name() == RobotName.OOGWAY: + directions = [ + # Go through gate + (3, 0, 0), + (3, 0, 0), + # Dead reckon to torpedo + (0, 3, 0), + (0, 3, 0) + ] - async def correct_z() -> Coroutine[None, None, None]: - await cv_tasks.correct_z(prop=CVObjectType.BUOY, parent=self) + elif get_robot_name() == RobotName.CRUSH: + directions = [ + (2, 0, 0), + (3, 0, 0), + ] + + await self.move_with_directions(directions, depth_level=DEPTH_LEVEL, timeout=15) + logger.info('[gate_task_dead_reckoning] Moved through gate, and strafed.') + + +@comp_task +async def yaw_until_object_detection(self: CompTask, cv_object: CVObjectType, depth_level: float = 0.7, + latency_threshold=10, direction=1) -> Task[None, None, None] | bool: + logger.info('[yaw_until_object_detection] Beginning yaw_util_object_detection task') + + MAXIMUM_YAW = math.radians(30) + + iteration = 1 + while not CV().is_receiving_recent_cv_data(cv_object, latency_threshold): + if iteration <= 3: + angle = MAXIMUM_YAW + elif iteration == 4: + angle = -2 * MAXIMUM_YAW + elif iteration <= 8: + angle = -1 * MAXIMUM_YAW + else: + angle = 3 * MAXIMUM_YAW + logger.info(f'[yaw_until_object_detection] Yawed to find {cv_object} more than 9 times, breaking loop.') - async def correct_depth() -> Coroutine[None, None, None]: - await move_tasks.correct_depth(desired_depth=DEPTH_LEVEL, parent=self) - self.correct_depth = correct_depth + logger.info(f'[yaw_until_object_detection] No {cv_object} detection, setting yaw setpoint {angle}') + await self.correct_yaw(angle * direction, yaw_tolerance=0.3, depth_level=depth_level, timeout=10) - async def move_x(step:float = 1) -> Coroutine[None, None, None]: - await move_tasks.move_x(step=step, parent=self) + if iteration > 8: + return False - def get_step_size(dist:float, dist_threshold:float) -> float: + # await self.correct_depth(depth_level) + iteration += 1 + await Yield() + + return True + + +@comp_task +async def yaw_to_cv_object(self: CompTask, cv_object: CVObjectType, direction=1, + yaw_threshold=math.radians(40), latency_threshold=10, + depth_level=0.5, use_position_control=True) -> Task[None, None, None] | bool: + """ + Corrects the yaw relative to the CV object. + """ + DEPTH_LEVEL = State().orig_depth - depth_level + MAXIMUM_YAW = math.radians(30) + POSITION_SCALE_FACTOR = 0.4 # How much the correction should be scaled down from yaw calculation + VELOCITY_SCALE_FACTOR = 0.1 + + logger.info('[yaw_to_cv_object] Starting yaw_to_cv_object') + + def get_step_size(desired_yaw): + # desired yaw in radians + return min(abs(desired_yaw), MAXIMUM_YAW) + + def get_yaw_threshold(desired_yaw, cv_x): + if cv_x < 128 or cv_x > 512: + yaw_threshold = desired_yaw * 2 + if cv_x < 256 or cv_x > 384: + yaw_threshold = desired_yaw * 1.75 + else: + yaw_threshold = desired_yaw * 1.35 + + return yaw_threshold + + async def get_robust_cv_object_yaw(): + # Take average of 5 numbers and use that for yaw to try and offset outliers + logger.info('[yaw_to_cv_object] Taking in 5 frames to calculate yaw offset') + + cv_yaws = [] + for _ in range(5): + cv_yaws.append(CV().bounding_boxes[cv_object].yaw) + await util_tasks.sleep(0.25, parent=self) + + yaws_sorted = sorted(cv_yaws) + trimmed = yaws_sorted[1:-1] # remove min and max + return sum(trimmed) / len(trimmed) + + # Yaw until object detection + # await self.correct_depth(DEPTH_LEVEL) + found = await yaw_until_object_detection(cv_object, depth_level=DEPTH_LEVEL, + latency_threshold=latency_threshold, direction=direction) + + # Could not find, so just surface and pray... + if not found: + return False + + logger.info(f'[yaw_to_cv_object] {cv_object} detected. Now centering {cv_object} in frame...') + + # Center detected object in camera frame + cv_object_yaw = await get_robust_cv_object_yaw() + + await self.correct_depth(DEPTH_LEVEL) + logger.info(f'[yaw_to_cv_object] abs(cv_object_yaw): {abs(cv_object_yaw)}') + logger.info(f'[yaw_to_cv_object] yaw_threshold: {yaw_threshold}') + + step = 1 + while abs(cv_object_yaw) > get_yaw_threshold(yaw_threshold, CV().bounding_boxes[cv_object].coords.x): + # If we have made 3 corrections already, trust that yaw is reasonable and continue forward + if step > 3: + logger.info('[yaw_to_cv_object] Yaw has been corrected more than 3 times, breaking loop.') + break + + # Actually do the yaw itself, and then correct depth + if use_position_control: + sign_cv_object_yaw = np.sign(cv_object_yaw) + correction = get_step_size(POSITION_SCALE_FACTOR * cv_object_yaw) # Scale down CV yaw value + + # Robot agnostic code base fails once again + if get_robot_name() == RobotName.OOGWAY: + desired_yaw = sign_cv_object_yaw * correction + else: + desired_yaw = -1 * sign_cv_object_yaw * correction + + logger.info(f'[yaw_to_cv_object] Detected yaw {cv_object_yaw} is greater than threshold {yaw_threshold}. ' + f'Actually yawing: {desired_yaw}') + await self.correct_yaw(desired_yaw, yaw_tolerance=0.15) + + else: + logger.info(f'[yaw_to_cv_object] Detected yaw {cv_object_yaw} is greater than threshold {yaw_threshold}. ' + f'Setting yaw power to: {cv_object_yaw * VELOCITY_SCALE_FACTOR}') + Controls().publish_desired_power(Twist(angular=Vector3(z=cv_object_yaw * VELOCITY_SCALE_FACTOR))) + + await self.correct_depth(DEPTH_LEVEL) + await Yield() + + # Over correction + if (not CV().is_receiving_recent_cv_data(cv_object, latency_threshold)): + logger.info(f'[yaw_to_cv_object] {cv_object} detection lost, running yaw_until_object_detection()') + await yaw_until_object_detection(cv_object, depth_level=DEPTH_LEVEL, + latency_threshold=latency_threshold, direction=direction) + + # Recalculate the yaw + cv_object_yaw = await get_robust_cv_object_yaw() + + step += 1 + + logger.info(f'[yaw_to_cv_object] {cv_object} centered, or limit has been reached.') + + await self.correct_depth(DEPTH_LEVEL) + + return True + + +@comp_task +async def buoy_task(self: CompTask, turn_to_face_buoy: bool = False, + depth_level: float = 0.7) -> Task[None, None, None]: + """ + Circumnavigate the buoy. Requires robot to have submerged 0.5 meters. + """ + logger.info('[buoy_task] Starting buoy task') + + DEPTH_LEVEL = State().orig_depth - depth_level + + def get_step_size(dist: float, dist_threshold: float) -> float: if dist > 3: return 2 if dist > 2: @@ -142,50 +410,27 @@ def get_step_size(dist:float, dist_threshold:float) -> float: async def move_to_buoy(buoy_dist_threshold=1): buoy_dist = CV().bounding_boxes[CVObjectType.BUOY].coords.x - await correct_y() - await correct_depth() + await self.correct_y_to_cv_obj(CVObjectType.BUOY, mult_factor=0.4) + await self.correct_depth(DEPTH_LEVEL) while buoy_dist > buoy_dist_threshold: - await move_x(step=get_step_size(buoy_dist, buoy_dist_threshold)) - logger.info(f"Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") - await correct_y() + await self.move_x(step=get_step_size(buoy_dist, buoy_dist_threshold)) + logger.info(f"[buoy_task] Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") + + await self.correct_y_to_cv_obj(CVObjectType.BUOY, mult_factor=0.4) if buoy_dist < 3: - await correct_z() + await self.correct_z_to_cv_obj(CVObjectType.BUOY) else: - await correct_depth() + await self.correct_depth(DEPTH_LEVEL) await Yield() - buoy_dist = CV().bounding_boxes[CVObjectType.BUOY] - logger.info(f"Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") + buoy_dist = CV().bounding_boxes[CVObjectType.BUOY].coords.x + logger.info(f"[buoy_task] Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") - await correct_z() + await self.correct_z_to_cv_obj(CVObjectType.BUOY) await move_to_buoy() - start_imu_orientation = copy.deepcopy(State().imu.orientation) - start_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(start_imu_orientation)) - - def get_yaw_correction(): - cur_imu_orientation = copy.deepcopy(State().imu.orientation) - cur_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_imu_orientation)) - - return start_imu_euler_angles[2] - cur_imu_euler_angles[2] - - async def correct_yaw(): - yaw_correction = get_yaw_correction() - logger.info(f'Yaw correction: {yaw_correction}') - sign = 1 if yaw_correction > 0.1 else (-1 if yaw_correction < -0.1 else 0) - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_correction + (sign * 0.1)), - keep_orientation=True, - parent=self, - ) - logger.info('Corrected yaw') - self.correct_yaw = correct_yaw - - async def move_with_directions(directions, correct_yaw=True): - await move_tasks.move_with_directions(directions, correct_yaw=correct_yaw, correct_depth=True, parent=self) - if turn_to_face_buoy: def get_step_size_move_away(dist, dist_threshold): if dist < 0.75: @@ -193,33 +438,33 @@ def get_step_size_move_away(dist, dist_threshold): return max(dist - dist_threshold - 0.1, -0.25) async def move_away_from_buoy(buoy_dist_threshold=1.0): - logger.info('Moving away from buoy') + logger.info('[buoy_task] Moving away from buoy') buoy_dist = CV().bounding_boxes[CVObjectType.BUOY].coords.x - await correct_y() - await correct_z() + await self.correct_y_to_cv_obj(CVObjectType.BUOY, mult_factor=0.4) + await self.correct_z_to_cv_obj(CVObjectType.BUOY) + while buoy_dist < buoy_dist_threshold: - await move_x(step=get_step_size_move_away(buoy_dist, buoy_dist_threshold)) + await self.move_x(step=get_step_size_move_away(buoy_dist, buoy_dist_threshold)) + + logger.info(f"[buoy_task] Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") + await self.correct_y_to_cv_obj(CVObjectType.BUOY, mult_factor=0.4) + await self.correct_z_to_cv_obj(CVObjectType.BUOY) - logger.info(f"Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") - await correct_y() - await correct_z() await Yield() buoy_dist = CV().bounding_boxes[CVObjectType.BUOY].coords.x - logger.info(f"Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") + logger.info(f"[buoy_task] Buoy dist: {CV().bounding_boxes[CVObjectType.BUOY].coords.x}") - logger.info('Moved away from buoy') + logger.info('[buoy_task] Moved away from buoy') # Circumnavigate buoy for _ in range(4): - DEPTH_LEVEL = State().depth directions = [ (0, 1.5, 0), (1, 0, 0), ] - await move_with_directions(directions, correct_yaw=False) - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, -math.radians(90)), - parent=self) - logger.info('Yaw 90 deg') + await self.move_with_directions(directions, correct_yaw=False, correct_depth=True) + await self.correct_yaw(-np.pi / 2) + logger.info('[buoy_task] Yaw 90 degrees') await move_away_from_buoy() else: @@ -230,24 +475,52 @@ async def move_away_from_buoy(buoy_dist_threshold=1.0): (-2.5, 0, 0), (0, 1.25, 0), ] - await move_with_directions(directions, correct_yaw=False) - + await self.move_with_directions(directions, correct_yaw=False, correct_depth=True) await move_to_buoy() -@task -async def after_buoy_task(self: Task): +@comp_task +async def buoy_circumnavigation_power(self: CompTask, depth_level: float = 0.7) -> Task[None, None, None]: + """ + Perform a buoy circumnavigation task with a specified depth adjustment. - DEPTH_LEVEL = State().depth - latency_threshold = 3 + Args: + self (Task): The task instance. + depth (float): The depth offset to adjust the circumnavigation. Default is 0.7. - async def correct_depth(): - await move_tasks.correct_depth(desired_depth=DEPTH_LEVEL, parent=self) - self.correct_depth = correct_depth + Returns: + Task[None, None, None]: The result of the circumnavigation task. + """ + DEPTH_LEVEL = State().orig_depth - depth_level - def stabilize(): - pose_to_hold = copy.deepcopy(State().state.pose.pose) - Controls().publish_desired_position(pose_to_hold) + def publish_power() -> None: + power = Twist() + power.linear.y = 0.9 + power.angular.z = -0.1 + + Controls().set_axis_control_type(x=ControlTypes.DESIRED_POWER, y=ControlTypes.DESIRED_POWER, + yaw=ControlTypes.DESIRED_POWER) + Controls().publish_desired_power(power, set_control_types=False) + + for _ in range(4): + publish_power() + logger.info('[buoy_circumnavigation_power] Publish power') + + await util_tasks.sleep(5, parent=self) + logger.info('[buoy_circumnavigation_power] Sleep 5 (1)') + + await self.stabilize() + logger.info('[buoy_circumnavigation_power] Stabilized') + await util_tasks.sleep(5, parent=self) + logger.info('[buoy_circumnavigation_power] Sleep 5 (2)') + + await self.correct_depth(DEPTH_LEVEL) + await self.move_y(step=1) + + +@comp_task +async def after_buoy_task(self: CompTask): + LATENCY_THRESHOLD = 3 directions = [ (0, -1.25, 0), @@ -261,19 +534,17 @@ def stabilize(): while not circumnavigate_task.done: circumnavigate_task.step() - if CV().is_receiving_recent_cv_data(CVObjectType.PATH_MARKER, latency_threshold): - stabilize() + if CV().is_receiving_recent_cv_data(CVObjectType.PATH_MARKER, latency=LATENCY_THRESHOLD): + await self.stabilize() await util_tasks.sleep(5, parent=self) break await Yield() - await move_tasks.move_with_directions([(0, 0, 0, 0, 0, -math.radians(90))], parent=self) + await self.correct_yaw(-np.pi / 2) await align_path_marker(direction=-1, parent=self) - DEPTH_LEVEL = State().orig_depth - 0.7 - directions = [ (2, 0, 0), (2, 0, 0), @@ -281,31 +552,22 @@ def stabilize(): (1, 0, 0), ] - await move_tasks.move_with_directions(directions, correct_yaw=False, correct_depth=True, parent=self) + await self.move_with_directions(directions, correct_yaw=False, correct_depth=True) - found_bins = await spiral_bin_search(parent=self) + found_bins = await spiral_bins_search(parent=self) if found_bins: - await bin_task(parent=self) + await marker_dropper_task(parent=self) - await yaw_to_cv_object('bin_pink_front', direction=1, yaw_threshold=math.radians(15), + await yaw_to_cv_object(CVObjectType.BIN_PINK_FRONT, direction=1, yaw_threshold=math.radians(15), depth_level=1.0, parent=Task.MAIN_ID) await octagon_task(direction=1, parent=self) -@task -async def buoy_to_octagon(self: Task, direction: int = 1, move_forward: int = 0): - DEPTH_LEVEL = State().orig_depth - 0.7 - - logger.info('Started buoy to octagon') - - async def move_with_directions(directions): - await move_tasks.move_with_directions(directions, correct_yaw=False, correct_depth=True, parent=self) - - async def correct_depth() -> Coroutine[None, None, None]: - await move_tasks.correct_depth(desired_depth=DEPTH_LEVEL, parent=self) - self.correct_depth = correct_depth +@comp_task +async def buoy_to_octagon(self: CompTask, direction: int = 1, move_forward: int = 0): + logger.info('[buoy_to_octagon] Started buoy to octagon') # Move towards octagon directions = [ @@ -315,250 +577,29 @@ async def correct_depth() -> Coroutine[None, None, None]: (0, 1 * direction, 0), (move_forward, 0, 0), ] - await move_with_directions(directions) + await self.move_with_directions(directions, correct_yaw=False) -@task -async def gate_to_octagon(self: Task, direction: int = 1, move_forward: int = 0, timeout: int=30): - DEPTH_LEVEL = State().orig_depth - 1.0 - logger.info('Started gate to octagon') +@comp_task +async def gate_to_octagon(self: CompTask, depth_level: float = 1, timeout: int = 30): + DEPTH_LEVEL = State().orig_depth - depth_level - async def move_with_directions(directions, depth_level=-1.0): - await move_tasks.move_with_directions(directions, depth_level, correct_yaw=True, correct_depth=True, - keep_orientation=True, timeout=timeout, parent=self) + logger.info('[gate_to_octagon] Started gate to octagon') directions = [ (3, 0, 0), (3, 0, 0), (3, 0, 0), ] - await move_with_directions(directions, depth_level=DEPTH_LEVEL) - - -@task -async def buoy_circumnavigation_power(self: Task, depth: float = 0.7) -> Task[None, None, None]: - """ - Perform a buoy circumnavigation task with a specified depth adjustment. + await self.move_with_directions(directions, depth_level=DEPTH_LEVEL, timeout=timeout) - Args: - self (Task): The task instance. - depth (float): The depth offset to adjust the circumnavigation. Default is 0.7. - Returns: - Task[None, None, None]: The result of the circumnavigation task. - """ - DEPTH_LEVEL = State().orig_depth - depth - - def publish_power() -> None: - power = Twist() - power.linear.y = 0.9 - power.angular.z = -0.1 - Controls().set_axis_control_type(x=ControlTypes.DESIRED_POWER, y=ControlTypes.DESIRED_POWER, - yaw=ControlTypes.DESIRED_POWER) - Controls().publish_desired_power(power, set_control_types=False) - - def stabilize() -> None: - pose_to_hold = copy.deepcopy(State().state.pose.pose) - Controls().publish_desired_position(pose_to_hold) - - async def correct_depth() -> Coroutine[None, None, None]: - await move_tasks.correct_depth(desired_depth=DEPTH_LEVEL, parent=self) - - for _ in range(4): - publish_power() - logger.info('Publish power') - await util_tasks.sleep(6, parent=self) - logger.info('Sleep 5 (1)') - stabilize() - logger.info('Stabilized') - await util_tasks.sleep(5, parent=self) - logger.info('Sleep 5 (2)') - await correct_depth() - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 1, 0, 0, 0, 0), parent=self) - - -@task -async def initial_submerge(self: Task, submerge_dist: float, z_tolerance: float = 0.1, - enable_controls_flag: bool = False, timeout: int = 30) -> Task[None, None, None]: - """ - Submerge the robot a given amount. - - Args: - submerge_dist: The distance to submerge the robot in meters. - enable_controls_flag: Flag to wait for ENABLE_CONTROLS status when true. - """ - logger.info("Starting initial submerge") - - while enable_controls_flag and not Controls().enable_controls_status.data: - await Yield() - - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, submerge_dist, 0, 0, 0), - keep_orientation=True, - pose_tolerances=move_tasks.create_twist_tolerance(linear_z=z_tolerance), - timeout=timeout, - parent=self, - ) - logger.info(f'Submerged {submerge_dist} meters') - - # async def correct_roll_and_pitch(): - # imu_orientation = State().imu.orientation - # euler_angles = quat2euler([imu_orientation.w, imu_orientation.x, imu_orientation.y, imu_orientation.z]) - # roll_correction = -euler_angles[0] * 1.2 - # pitch_correction = -euler_angles[1] * 1.2 - - # logger.info(f'Roll, pitch correction: {roll_correction, pitch_correction}') - # await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, roll_correction, pitch_correction, 0), - # parent=self) - - # await correct_roll_and_pitch() - - -@task -async def coin_flip(self: Task, depth_level: float = 0.7, - enable_same_direction: bool = True, timeout: int = 15) -> Task[None, None, None]: - """ - Perform the coin flip task, adjusting the robot's yaw and depth. - - The coin flip task involves correcting the robot's yaw to return it to its original orientation and then adjusting its depth. The task continuously calculates yaw corrections based on the difference between the current and original orientations, making incremental adjustments until the yaw is within a specified threshold. After correcting yaw, the robot adjusts its depth to reach the desired level. - - Args: - self (Task): The task instance managing the execution of the coin flip task. - depth_level (float): The depth adjustment level relative to the robot's original depth. Default is 0.7. - - Returns: - Task[None, None, None]: The result of the task execution. - - Detailed Process: - 1. Calculate the desired yaw correction using the difference between the original and current IMU orientations. - 2. Gradually adjust yaw in steps, ensuring the correction does not exceed the maximum allowed yaw change. - 3. Once the yaw is corrected to within 5 degrees, adjust the robot's depth to the specified level. - 4. Log each step of the process for debugging and traceability. - - Logging: - - Logs the initial start of the coin flip task. - - Logs intermediate yaw corrections and desired yaw adjustments. - - Logs depth corrections and the final completion of the task. - - Example: - >>> await coin_flip(task_instance, depth_level=0.5) - - Notes: - - Uses `State` to access robot's current and original states, including depth and IMU orientation. - - Uses `geometry_utils` to create poses for yaw and depth corrections. - - The task continuously loops until the yaw correction is within the specified threshold (±5 degrees). - """ +@comp_task +async def slalom_task_dead_reckoning(self: CompTask, depth_level=1.1) -> Task[None, None, None]: DEPTH_LEVEL = State().orig_depth - depth_level - MAXIMUM_YAW = math.radians(30) - - def get_step_size(desired_yaw): - return min(abs(desired_yaw), MAXIMUM_YAW) - - # def get_yaw_correction(): - # orig_imu_orientation = copy.deepcopy(State().orig_imu.orientation) - # orig_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(orig_imu_orientation)) - - # cur_imu_orientation = copy.deepcopy(State().imu.orientation) - # cur_imu_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_imu_orientation)) - - # correction = orig_imu_euler_angles[2] - cur_imu_euler_angles[2] - - # sign_correction = np.sign(correction) - # desired_yaw = sign_correction * get_step_size(correction) - # logger.info(f'Coinflip: imu_yaw_correction = {desired_yaw}') - - # return desired_yaw - - def get_gyro_yaw_correction(return_raw=True): - orig_gyro_orientation = copy.deepcopy(State().orig_gyro.pose.pose.orientation) - orig_gyro_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(orig_gyro_orientation)) - - cur_gyro_orientation = copy.deepcopy(State().gyro.pose.pose.orientation) - cur_gyro_euler_angles = quat2euler(geometry_utils.geometry_quat_to_transforms3d_quat(cur_gyro_orientation)) - - raw_correction = cur_gyro_euler_angles[2] - orig_gyro_euler_angles[2] - correction = (-raw_correction) % (2 * np.pi) - - logger.info(f'Coinflip: raw_gyro_yaw_correction = {raw_correction}') - logger.info(f'Coinflip: processed_gyro_yaw_correction = {correction}') - - if return_raw: - return raw_correction - else: - return correction - - if (enable_same_direction): - while abs(get_gyro_yaw_correction(return_raw=True)) > math.radians(5): - yaw_correction = get_gyro_yaw_correction(return_raw=False) - logger.info(f'Yaw correction: {yaw_correction}') - if (yaw_correction > np.pi): - yaw_correction -= np.pi - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, np.pi), - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.05), - timeout=timeout, - parent=self, - # TODO: maybe set yaw tolerance? - ) - - logger.info('Yaw correct 180') - - logger.info(f'Yaw correct remainder: {yaw_correction}') - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_correction), - parent=self, - # TODO: maybe set yaw tolerance? - ) - else: - while abs(get_gyro_yaw_correction(return_raw=True)) > math.radians(5): - yaw_correction = get_gyro_yaw_correction(return_raw=False) - logger.info(f'Yaw correction: {yaw_correction}') - - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_correction), - timeout=timeout, - parent=self, - # TODO: maybe set yaw tolerance? - ) - - logger.info('Step Completed') - logger.info(f'Final yaw offset: {get_gyro_yaw_correction(return_raw=True)}') - - depth_delta = DEPTH_LEVEL - State().depth - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, depth_delta, 0, 0, 0), - keep_orientation=True, - parent=self) - logger.info(f'Corrected depth {depth_delta}') - logger.info('Completed coin flip') + logger.info('[slalom_task_dead_reckoning] Started slalom task') - -@task -async def gate_task_dead_reckoning(self: Task, depth_level=0.7) -> Task[None, None, None]: - DEPTH_LEVEL = State().orig_depth - depth_level - logger.info('Started gate task') - if get_robot_name() == RobotName.OOGWAY: - # Go through gate - await move_tasks.move_with_directions([(3, 0, 0)], depth_level=depth_level, correct_depth=True, correct_yaw=True, parent=self) - await move_tasks.move_with_directions([(3, 0, 0)], depth_level=depth_level, correct_depth=True, correct_yaw=True, parent=self) - - # Dead reckon a bit to torpedo - await move_tasks.move_with_directions([(0, 3, 0)], depth_level=depth_level, correct_depth=True, correct_yaw=True, parent=self) - await move_tasks.move_with_directions([(0, 3, 0)], depth_level=depth_level, correct_depth=True, correct_yaw=True, parent=self) - elif get_robot_name() == RobotName.CRUSH: - directions = [ - (2, 0, 0), - (3, 0, 0), - ] - await move_tasks.move_with_directions(directions, depth_level=DEPTH_LEVEL, correct_depth=True, correct_yaw=True, - keep_orientation=True, timeout=15, parent=self) - logger.info('Moved through gate, and strafed.') - -@task -async def slalom_task_dead_reckoning(self: Task, depth_level=1.1) -> Task[None, None, None]: - DEPTH_LEVEL = State().orig_depth - depth_level - logger.info('Started slalom task') if get_robot_name() == RobotName.OOGWAY: pass elif get_robot_name() == RobotName.CRUSH: @@ -567,452 +608,208 @@ async def slalom_task_dead_reckoning(self: Task, depth_level=1.1) -> Task[None, (2, 0, 0), (2, 0, 0), ] - await move_tasks.move_with_directions(directions, depth_level=DEPTH_LEVEL, correct_depth=True, correct_yaw=True, - keep_orientation=True, timeout=20, parent=self) - logger.info('Moved through slalom') + await self.move_with_directions(directions, depth_level=DEPTH_LEVEL, timeout=20) + + logger.info('[slalom_task_dead_reckoning] Finished slalom task') -@task -async def slalom_to_octagon_dead_reckoning(self: Task, depth_level: float = 1.1) -> Task[None, None, None]: + +@comp_task +async def slalom_to_octagon_dead_reckoning(self: CompTask, depth_level: float = 1.1) -> Task[None, None, None]: DEPTH_LEVEL = State().orig_depth - depth_level - latency_threshold = 10 + LATENCY_THRESHOLD = 10 async def face_fish(yaw_left: bool = True, closer_banner: bool = True): direction = 1 if yaw_left else -1 - yaw_distance = np.pi/4 if closer_banner else 3*np.pi/4 + yaw_distance = np.pi / 4 if closer_banner else 3 * np.pi / 4 await orient_to_wall(parent=self) await orient_to_wall(parent=self) - await move_tasks.yaw_from_local_pose(direction*yaw_distance, parent=self) + await self.correct_yaw(direction * yaw_distance) - logger.info('Started slalom task') + logger.info('[slalom_to_octagon_dead_reckoning] Started slalom task') if get_robot_name() == RobotName.OOGWAY: pass elif get_robot_name() == RobotName.CRUSH: before_cv_directions = [ - (2,0,0), - (2,0,0), + (2, 0, 0), + (2, 0, 0), ] - await move_tasks.move_with_directions(before_cv_directions, depth_level=DEPTH_LEVEL, correct_depth=True, - correct_yaw=True, keep_orientation=True, timeout=15, parent=self) - - logger.info("Checking yellow bin detection") - MAXIMUM_YAW = math.radians(30) - step = 1 - - while not CV().is_receiving_recent_cv_data(CVObjectType.BIN_PINK_FRONT, latency_threshold): - if step <= 3: - angle = MAXIMUM_YAW - elif step == 4: - angle = -2 * MAXIMUM_YAW - elif step <= 8: - angle = -1 * MAXIMUM_YAW - else: - angle = 3 * MAXIMUM_YAW - logger.info(f'Yawed to find object more than 9 times, breaking loop.') - - logger.info(f'No {cv_object} detection, setting yaw setpoint {angle}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, angle * direction), - depth_level=depth_level, - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.3), - timeout=10, - parent=self) - - if step > 8: - break + await self.move_with_directions(before_cv_directions, depth_level=DEPTH_LEVEL, timeout=15) - step += 1 - await Yield() + logger.info("[slalom_to_octagon_dead_reckoning] Checking pink bin detection") + await yaw_until_object_detection(CVObjectType.BIN_PINK_FRONT, + depth_level=DEPTH_LEVEL, latency=LATENCY_THRESHOLD) after_cv_directions = [ - (2,0,0), + (2, 0, 0), ] - await move_tasks.move_with_directions(after_cv_directions, depth_level=DEPTH_LEVEL, correct_depth=True, - correct_yaw=True, keep_orientation=True, timeout=15, parent=self) + await self.move_with_directions(after_cv_directions, depth_level=DEPTH_LEVEL, timeout=15) await face_fish(yaw_left=False, closer_banner=False) - logger.info('Surfacing...') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, State().orig_depth - State().depth, 0, 0, 0), - timeout=10, parent=self) - logger.info('Finished surfacing') - - -@task -async def return_task_dead_reckoning(self: Task, depth_level=0.7) -> Task[None, None, None]: - logger.info('Started gate return task') - DEPTH_LEVEL = State().orig_depth - depth_level - if get_robot_name() == RobotName.OOGWAY: - pass - elif get_robot_name() == RobotName.CRUSH: - directions = [ - (-3, 0, 0), - (-3, 0, 0), - (-3, 0, 0), - (-3, 0, 0), - ] - await move_tasks.move_with_directions(directions, depth_level=DEPTH_LEVEL, correct_depth=True, correct_yaw=True, - keep_orientation=True, timeout=15, parent=self) - logger.info('Moved through gate return') - -@task -async def gate_task(self: Task, offset: int = 0, direction: int = 1) -> Task[None, None, None]: - """ - Asynchronous task to perform gate-related operations. - NOTE: This code is assuming we choose the sawfish side of the gate. - """ - logger.info('Started gate task') - depth_level = State().orig_depth - 0.7 - - async def correct_y(factor: int=1) -> None: - await cv_tasks.correct_y(prop=CVObjectType.GATE_SAWFISH, add_factor=0.2 + offset, mult_factor=factor, parent=self) + logger.info('[slalom_to_octagon_dead_reckoning] Surfacing...') - async def correct_z() -> None: - await cv_tasks.correct_z(prop=CVObjectType.GATE_SAWFISH, parent=self) + await self.correct_depth(State().orig_depth) + logger.info('[slalom_to_octagon_dead_reckoning] Finished surfacing') - async def correct_depth() -> None: - await move_tasks.correct_depth(desired_depth=depth_level, parent=self) - async def move_x(step=1) -> None: - await move_tasks.move_x(step=step, parent=self) +@comp_task +async def center_path_marker(self: CompTask, depth_level=0.5): + DEPTH_LEVEL = State().orig_depth - depth_level + PIXEL_THRESHOLD = 70 + STEP_SIZE = 0.2 - def get_step_size(dist: float) -> float: - dist_threshold = 4 - if dist > dist_threshold: + def get_step_mult_factor(dist: float, threshold: float) -> int: + if abs(dist) < threshold: + return 0 + if dist > threshold: return 1 - return max(dist-3 + 0.25, 0.25) - - logger.info('Begin sleep') - await util_tasks.sleep(2, parent=self) - logger.info('End sleep') - - gate_dist = CV().bounding_boxes[CVObjectType.GATE_SAWFISH].coords.x - # await correct_y(factor=0.5) - await correct_depth() - num_corrections = 0 - while gate_dist > 3: - await move_x(step=get_step_size(gate_dist)) - await yaw_to_cv_object(CVObjectType.GATE_SAWFISH, direction=-1, yaw_threshold=math.radians(10), - latency_threshold=2, depth_level=0.6, parent=self), - # await correct_y(factor=(0.5 if num_corrections < 0 else 1)) - await correct_depth() - await Yield() - gate_dist = CV().bounding_boxes[CVObjectType.GATE_SAWFISH].coords.x - logger.info(f'Gate dist: {gate_dist}') - num_corrections += 1 - - directions = [ - (2, 0, 0), - (0, 0.2 * direction, 0), - (2, 0, 0), - (1, 0, 0), - ] - - await move_tasks.move_with_directions(directions, correct_yaw=False, correct_depth=True, parent=self) + return -1 - logger.info('Moved through gate') + await self.correct_depth(DEPTH_LEVEL) -@task -async def yaw_to_cv_object_vel(self: Task, cv_object: CVObjectType, direction=1, - yaw_threshold=math.radians(40), latency_threshold=10, - depth_level=0.5) -> Task[None, None, None]: - """ - Corrects the yaw relative to the CV object via simple linear velocity control. - """ - DEPTH_LEVEL = State().orig_depth - depth_level - MAXIMUM_YAW = math.radians(20) - SCALE_FACTOR = 0.1 - - logger.info('Starting yaw_to_cv_object') - - async def correct_depth(): - # await move_tasks.depth_correction(DEPTH_LEVEL, parent=self) - await move_tasks.depth_correction(desired_depth=DEPTH_LEVEL, parent=self) - - async def yaw_until_object_detection(): - while not CV().is_receiving_recent_cv_data(cv_object, latency_threshold): - logger.info(f'No {cv_object} detection, setting yaw setpoint {MAXIMUM_YAW}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, MAXIMUM_YAW * direction), - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.2), - parent=self) - await correct_depth() - await Yield() + logger.info(f'[center_path_marker] Path marker pixel distances: {CV().distances[CVObjectType.PATH_MARKER]}') + pixel_x = CV().distances[CVObjectType.PATH_MARKER].x + pixel_y = CV().distances[CVObjectType.PATH_MARKER].y - # Yaw until object detection - await correct_depth() - await yaw_until_object_detection() + count = 1 + while (max(pixel_x, pixel_y) > PIXEL_THRESHOLD or min(pixel_x, pixel_y) < -PIXEL_THRESHOLD): + await self.move_x(step=STEP_SIZE * get_step_mult_factor(pixel_x, PIXEL_THRESHOLD)) + await self.move_y(step=STEP_SIZE * get_step_mult_factor(pixel_y, PIXEL_THRESHOLD)) - logger.info(f'{cv_object} detected. Now centering {cv_object} in frame...') + logger.info(f'[center_path_marker] Path marker pixel distances: {CV().distances[CVObjectType.PATH_MARKER]}') + pixel_x = CV().distances[CVObjectType.PATH_MARKER].x + pixel_y = CV().distances[CVObjectType.PATH_MARKER].y - # Center detected object in camera frame - cv_object_yaw = CV().bounding_boxes[cv_object].yaw # degrees - await correct_depth() - logger.info(f'abs(cv_object_yaw): {abs(cv_object_yaw)}') - logger.info(f'yaw_threshold: {yaw_threshold}') - while abs(cv_object_yaw) > yaw_threshold: - logger.info(f'Detected yaw {cv_object_yaw} is greater than threshold {yaw_threshold}. Setting yaw power to: {cv_object_yaw * SCALE_FACTOR}') - Controls().publish_desired_power(Twist(angular=Vector3(z=cv_object_yaw * SCALE_FACTOR))) - # await correct_depth() - # await Yield() + if count % 2 == 0: + logger.info('[center_path_marker] Correcting depth') + await self.correct_depth(DEPTH_LEVEL) - if (not CV().is_receiving_recent_cv_data(cv_object, latency_threshold)): - logger.info(f'{cv_object} detection lost, running yaw_until_object_detection()') - await yaw_until_object_detection() + await Yield() - cv_object_yaw = CV().bounding_boxes[cv_object].yaw + count += 1 - logger.info(f'{cv_object} centered.') + logger.info('[center_path_marker] Finished centering path marker') - await correct_depth() -@task -async def align_path_marker(self: Task, direction=1) -> Task[None, None, None]: +@comp_task +async def align_path_marker(self: CompTask, depth_level=0.5) -> Task[None, None, None]: """ Corrects the yaw relative to the CV object. Follows the yaw and center loop. """ - DEPTH_LEVEL = State().orig_depth - 0.5 + DEPTH_LEVEL = State().orig_depth - depth_level MAXIMUM_YAW = math.radians(30) YAW_THRESHOLD = math.radians(5) PIXEL_THRESHOLD = 70 - logger.info('Starting align path marker') - - async def move_x(step=1): - await move_tasks.move_x(step=step, parent=self) - - async def move_y(step=1): - await move_tasks.move_y(step=step, parent=self) - - async def correct_depth(): - await move_tasks.depth_correction(desired_depth=DEPTH_LEVEL, parent=self) + logger.info('[align_path_marker] Starting align path marker') def get_step_size(desired_yaw): - # desired yaw in rads + # desired yaw in radians return min(abs(desired_yaw), MAXIMUM_YAW) - def get_step_mult_factor(dist, threshold): - if abs(dist) < threshold: - return 0 - if dist > threshold: - return 1 - return -1 - - async def center_path_marker(pixel_threshold: float, step_size=0.20, x_offset=0, y_offset=0) -> None: - logger.info(CV().distances[CVObjectType.PATH_MARKER]) - pixel_x = CV().distances[CVObjectType.PATH_MARKER].x + x_offset - pixel_y = CV().distances[CVObjectType.PATH_MARKER].y + y_offset - - count = 1 - while (max(pixel_x, pixel_y) > pixel_threshold or min(pixel_x, pixel_y) < -pixel_threshold): - logger.info(CV().distances[CVObjectType.PATH_MARKER]) - - await move_x(step=step_size * get_step_mult_factor(pixel_x, pixel_threshold)) - await move_y(step=step_size * get_step_mult_factor(pixel_y, pixel_threshold)) - - pixel_x = CV().distances[CVObjectType.PATH_MARKER].x + x_offset - pixel_y = CV().distances[CVObjectType.PATH_MARKER].y + y_offset + await self.correct_depth(DEPTH_LEVEL) - if count % 3 == 0: - logger.info('Correcting depth') - await correct_depth() - - await Yield() - - logger.info(f'x: {pixel_x}, y: {pixel_y}') - - count += 1 - - logger.info('Finished centering path marker') - logger.info(f'x: {pixel_x}, y: {pixel_y}') - - logger.info('Now aligning path marker in frame...') - - # Center detected object in camera frame + logger.info('[align_path_marker] Now aligning path marker in frame...') + # Center detected path marker in camera frame path_marker_yaw = CV().bounding_boxes[CVObjectType.PATH_MARKER].yaw - await correct_depth() - logger.info(f"abs(path_marker_yaw) = '{abs(path_marker_yaw)}") - logger.info(f'yaw_threshold = {YAW_THRESHOLD}') + logger.info(f'[align_path_marker] abs(path_marker_yaw) = {abs(path_marker_yaw)}') + logger.info(f'[align_path_marker] yaw_threshold = {YAW_THRESHOLD}') while abs(path_marker_yaw) > YAW_THRESHOLD: sign_path_marker_yaw = np.sign(path_marker_yaw) correction = get_step_size(path_marker_yaw) desired_yaw = sign_path_marker_yaw * correction - logger.info(f'Detected yaw {path_marker_yaw} is greater than threshold {YAW_THRESHOLD}. Yawing: {desired_yaw}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, desired_yaw), parent=self) - await correct_depth() + # Yaw to align with path marker + logger.info(f'[align_path_marker] Detected yaw {path_marker_yaw} is greater than threshold {YAW_THRESHOLD}. ' + f'Yawing: {desired_yaw}') + await self.correct_yaw(desired_yaw, yaw_tolerance=YAW_THRESHOLD) + await self.correct_depth(DEPTH_LEVEL) + + # Recenter path marker in camera frame await center_path_marker(pixel_threshold=PIXEL_THRESHOLD) - await correct_depth() + await self.correct_depth(DEPTH_LEVEL) await Yield() path_marker_yaw = CV().bounding_boxes[CVObjectType.PATH_MARKER].yaw - logger.info('Path marker centered.') - - await correct_depth() - - -@task -async def center_path_marker(self: Task): - DEPTH_LEVEL = State().orig_depth - 0.5 - PIXEL_THRESHOLD = 70 - - async def correct_depth(desired_depth=DEPTH_LEVEL): - await move_tasks.correct_depth(desired_depth=desired_depth, parent=self) - self.correct_depth = correct_depth + logger.info('[align_path_marker] Path marker centered and aligned.') - async def move_x(step=1): - await move_tasks.move_x(step=step, parent=self) + await self.correct_depth(DEPTH_LEVEL) - async def move_y(step=1): - await move_tasks.move_y(step=step, parent=self) - - def get_step_mult_factor(dist: float, threshold: float) -> int: - if abs(dist) < threshold: - return 0 - if dist > threshold: - return 1 - return -1 - - async def center_path_marker(pixel_threshold: float, step_size=0.20, x_offset=0, y_offset=0) -> None: - logger.info(CV().distances[CVObjectType.PATH_MARKER]) - pixel_x = CV().distances[CVObjectType.PATH_MARKER].x + x_offset - pixel_y = CV().distances[CVObjectType.PATH_MARKER].y + y_offset - - count = 1 - while (max(pixel_x, pixel_y) > pixel_threshold or min(pixel_x, pixel_y) < -pixel_threshold): - logger.info(CV().distances[CVObjectType.PATH_MARKER]) - - await move_x(step=step_size * get_step_mult_factor(pixel_x, pixel_threshold)) - await move_y(step=step_size * get_step_mult_factor(pixel_y, pixel_threshold)) - - pixel_x = CV().distances[CVObjectType.PATH_MARKER].x + x_offset - pixel_y = CV().distances[CVObjectType.PATH_MARKER].y + y_offset - - if count % 3 == 0: - logger.info('Correcting depth') - await correct_depth() - - await Yield() - logger.info(f'x: {pixel_x}, y: {pixel_y}') - - count += 1 - - logger.info('Finished centering path marker') - logger.info(f'x: {pixel_x}, y: {pixel_y}') - - await correct_depth() - await center_path_marker(pixel_threshold=PIXEL_THRESHOLD, x_offset=-120, y_offset=-120) - - -@task -async def path_marker_to_pink_bin(self: Task, maximum_distance: int = 6): - DEPTH_LEVEL = State().orig_depth - 0.5 - AREA_THRESHOLD = 1000 - LATENCY_THRESHOLD = 1 - - logger.info('Starting path marker to bins') - - async def correct_depth(desired_depth: float) -> None: - await move_tasks.correct_depth(desired_depth=desired_depth, parent=self) - - def is_receiving_bin_data(bin_object: CVObjectType, last_detection_time) -> bool: - width = CV().bounding_boxes[bin_object].width - height = CV().bounding_boxes[bin_object].height - - return width * height >= AREA_THRESHOLD \ - and CV().is_receiving_recent_cv_data(bin_object, LATENCY_THRESHOLD, last_detection_time) - - async def move_x(step: float = 1) -> None: - await move_tasks.move_x(step=step, parent=self) - - def stabilize() -> None: - pose_to_hold = copy.deepcopy(State().state.pose.pose) - Controls().publish_desired_position(pose_to_hold) - - async def move_to_bins() -> None: - count = 1 - bin_red_time = None - bin_blue_time = None - - await move_x(step=1) - - while not is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time) \ - or not is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time): - bin_red_time = CV().bounding_boxes[CVObjectType.BIN_RED].header.stamp.secs - bin_blue_time = CV().bounding_boxes[CVObjectType.BIN_BLUE].header.stamp.secs - - await correct_depth(DEPTH_LEVEL) - - is_receiving_red_bin_data = is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time) - is_receiving_blue_bin_data = is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time) +@comp_task +async def path_marker_to_marker_dropper_bins(self: CompTask, maximum_distance: float = 6): + DEPTH_LEVEL = State().orig_depth - 0.5 + AREA_THRESHOLD = 1000 + LATENCY_THRESHOLD = 1 - logger.info(f'Receiving red bin data: {is_receiving_red_bin_data}') - logger.info(f'Receiving blue bin data: {is_receiving_blue_bin_data}') + logger.info('[path_marker_to_marker_dropper_bins] Starting path marker to marker dropper bins') - step = 0.5 if (is_receiving_red_bin_data or is_receiving_blue_bin_data) else 1 - await move_x(step=step) + def is_receiving_bin_data(bin_object: CVObjectType, last_detection_time) -> bool: + width = CV().bounding_boxes[bin_object].width + height = CV().bounding_boxes[bin_object].height - await Yield() + return width * height >= AREA_THRESHOLD \ + and CV().is_receiving_recent_cv_data(bin_object, LATENCY_THRESHOLD, last_detection_time) - if count >= maximum_distance: - logger.info('Bin not spotted, exiting the loop...') - break + await self.move_x(step=1) - count += 1 + total_distance = 0 + bin_red_time = None + bin_blue_time = None - logger.info('Reached pink bins, stabilizing...') - stabilize() - await util_tasks.sleep(5, parent=self) + while not is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time) \ + or not is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time): + bin_red_time = CV().bounding_boxes[CVObjectType.BIN_RED].header.stamp.secs + bin_blue_time = CV().bounding_boxes[CVObjectType.BIN_BLUE].header.stamp.secs - await move_to_bins() + await self.correct_depth(DEPTH_LEVEL) -@task -async def spiral_bin_search(self: Task) -> Task[None, None, None]: - DEPTH_LEVEL = State().orig_depth - 0.5 - AREA_THRESHOLD = 1000 - LATENCY_THRESHOLD = 1 + is_receiving_red_bin_data = is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time) + is_receiving_blue_bin_data = is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time) - class Direction(Enum): - FORWARD = 1, - BACK = 2, - LEFT = 3, - RIGHT = 4 + logger.info(f'[path_marker_to_marker_dropper_bins] Receiving red bin data: {is_receiving_red_bin_data}') + logger.info(f'[path_marker_to_marker_dropper_bins] Receiving blue bin data: {is_receiving_blue_bin_data}') - def publish_power(direction: Direction): - power = Twist() + step = 0.5 if (is_receiving_red_bin_data or is_receiving_blue_bin_data) else 1 + await self.move_x(step=step) - if direction == Direction.FORWARD: - power.linear.x = 0.7 - elif direction == Direction.BACK: - power.linear.x = -0.7 - elif direction == Direction.LEFT: - power.linear.y = 1.0 - elif direction == Direction.RIGHT: - power.linear.y = -1.0 + await Yield() - if direction in [Direction.FORWARD, Direction.BACK]: - Controls().set_axis_control_type(x=ControlTypes.DESIRED_POWER, y=ControlTypes.DESIRED_POSITION) - elif direction in [Direction.LEFT, Direction.RIGHT]: - Controls().set_axis_control_type(x=ControlTypes.DESIRED_POSITION, y=ControlTypes.DESIRED_POWER) + total_distance += step + if total_distance >= maximum_distance: + logger.info('[path_marker_to_marker_dropper_bins] Marker dropper bins not spotted, exiting the loop...') + break - Controls().publish_desired_power(power, set_control_types=False) + logger.info('[path_marker_to_marker_dropper_bins] Reached marker dropper bins, stabilizing...') + await self.stabilize() - async def move_step(direction: Direction, steps: float): - pose = geometry_utils.create_pose(0, 0, 0, 0, 0, 0) + await util_tasks.sleep(5, parent=self) - if direction == Direction.FORWARD: - pose.position.x = steps - elif direction == Direction.BACK: - pose.position.x = -steps - elif direction == Direction.LEFT: - pose.position.y = steps - elif direction == Direction.RIGHT: - pose.position.y = -steps - await move_tasks.move_to_pose_local(pose, parent=self) +@comp_task +async def spiral_bins_search(self: CompTask, depth_level=0.5, + spiral_step_size: float = 0.5) -> Task[None, None, None] | bool: + DEPTH_LEVEL = State().orig_depth - depth_level + AREA_THRESHOLD = 1000 + LATENCY_THRESHOLD = 1 - DIRECTIONS = [ + class Direction(Enum): + FORWARD = 'forward' + BACK = 'backward' + LEFT = 'left' + RIGHT = 'right' + + MOVE_FUNC_MAPPING = { + Direction.FORWARD: lambda s: move_tasks.move_x(step=s, parent=self), + Direction.BACK: lambda s: move_tasks.move_x(step=-s, parent=self), + Direction.LEFT: lambda s: move_tasks.move_y(step=s, parent=self), + Direction.RIGHT: lambda s: move_tasks.move_y(step=-s, parent=self), + } + + SPIRAL_PATTERN: list[tuple[Direction, int]] = [ (Direction.FORWARD, 1), (Direction.LEFT, 1), (Direction.BACK, 2), @@ -1031,9 +828,6 @@ async def move_step(direction: Direction, steps: float): (Direction.RIGHT, 8), ] - async def correct_depth(desired_depth: float) -> None: - await move_tasks.correct_depth(desired_depth=desired_depth, parent=self) - def is_receiving_bin_data(bin_object: CVObjectType, last_detection_time) -> bool: width = CV().bounding_boxes[bin_object].width height = CV().bounding_boxes[bin_object].height @@ -1041,111 +835,62 @@ def is_receiving_bin_data(bin_object: CVObjectType, last_detection_time) -> bool return width * height >= AREA_THRESHOLD \ and CV().is_receiving_recent_cv_data(bin_object, LATENCY_THRESHOLD, last_detection_time) - def stabilize() -> None: - pose_to_hold = copy.deepcopy(State().state.pose.pose) - Controls().publish_desired_position(pose_to_hold) - - async def search_for_bins() -> bool: - logger.info('Searching for red/blue bins...') - bin_red_time = None - bin_blue_time = None + logger.info('[spiral_bins_search] Searching for marker dropper bins...') + bin_red_time = None + bin_blue_time = None - for direction, secs in DIRECTIONS: - bin_found = False - secs *= 0.5 - logger.info(f'Publishing power: {direction, secs}') + for direction, raw_distance in SPIRAL_PATTERN: + distance = raw_distance * spiral_step_size + logger.info(f'[spiral_bins_search] Moving {direction.value} by {distance} units') - await move_step(direction, secs) + move_task = MOVE_FUNC_MAPPING[direction](distance) - iterations = secs / 0.1 - for _ in range(int(iterations)): - bin_red_time = CV().bounding_boxes[CVObjectType.BIN_RED].header.stamp.secs - bin_blue_time = CV().bounding_boxes[CVObjectType.BIN_BLUE].header.stamp.secs + while not move_task.done: + move_task.step() - is_receiving_red_bin_data = is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time) - is_receiving_blue_bin_data = is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time) - - bin_found = is_receiving_red_bin_data and is_receiving_blue_bin_data + bin_red_time = CV().bounding_boxes[CVObjectType.BIN_RED].header.stamp.secs + bin_blue_time = CV().bounding_boxes[CVObjectType.BIN_BLUE].header.stamp.secs - if bin_found: - break + is_receiving_red_bin_data = is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time) + is_receiving_blue_bin_data = is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time) - await util_tasks.sleep(0.1, parent=self) - await Yield() + if is_receiving_red_bin_data and is_receiving_blue_bin_data: + logger.info('[spiral_bins_search] Found marker dropper bins, stabilize and terminating...') + await self.stabilize() + await util_tasks.sleep(5, parent=self) + return True - await correct_depth(DEPTH_LEVEL) + await util_tasks.sleep(0.1, parent=self) + await Yield() - if bin_found: - logger.info('Found bin, terminating...') - break + await self.correct_depth(DEPTH_LEVEL) - logger.info(f'Received red bin data: {is_receiving_red_bin_data}') - logger.info(f'Received blue bin data: {is_receiving_blue_bin_data}') + logger.info(f'[spiral_bins_search] Received red: {is_receiving_bin_data(CVObjectType.BIN_RED, bin_red_time)}') + logger.info(f'[spiral_bins_search] Received blue: {is_receiving_bin_data(CVObjectType.BIN_BLUE, bin_blue_time)}') - return bin_found + logger.info('[spiral_bins_search] Spiral search completed without finding marker dropper bins, stabilizing...') + await self.stabilize() + await util_tasks.sleep(5, parent=self) - return (await search_for_bins()) + return False -@task -async def bin_task(self: Task) -> Task[None, None, None]: +@comp_task +async def marker_dropper_task(self: CompTask) -> Task[None, None, None]: """ Detects and drops markers into the red bin. Requires robot to have submerged 0.7 meters. """ - logger.info('Started bin task') START_DEPTH_LEVEL = State().orig_depth - 0.6 START_PIXEL_THRESHOLD = 70 MID_DEPTH_LEVEL = State().orig_depth - 1.0 MID_PIXEL_THRESHOLD = 30 - + YAW_THRESHOLD = math.radians(5) FRAME_AREA = 480 * 600 - TIMEOUT = Duration(seconds=240) - start_time = Clock().now() - - drop_marker = Servos().drop_marker + logger.info('[marker_dropper_task] Started marker dropper task') - async def correct_x(target: CVObjectType) -> None: - await cv_tasks.correct_x(prop=target, parent=self) - - async def correct_y(target: CVObjectType) -> None: - await cv_tasks.correct_y(prop=target, parent=self) - - async def correct_z(): - pass - - async def correct_depth(desired_depth: float) -> None: - await move_tasks.correct_depth(desired_depth=desired_depth, parent=self) - self.correct_depth = correct_depth - - async def correct_yaw() -> None: - yaw_correction = CV().angles[CVObjectType.BIN_WHOLE] - logger.info(f'Yaw correction: {yaw_correction}') - sign = 1 if yaw_correction > 0.1 else (-1 if yaw_correction < -0.1 else 0) - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_correction + (sign * 0.1)), - keep_orientation=True, - parent=self, - ) - logger.info('Corrected yaw') - self.correct_yaw = correct_yaw - - async def correct_roll_and_pitch() -> None: - imu_orientation = State().imu.orientation - euler_angles = quat2euler([imu_orientation.w, imu_orientation.x, imu_orientation.y, imu_orientation.z]) - roll_correction = -euler_angles[0] * 1.2 - pitch_correction = -euler_angles[1] * 1.2 - - logger.info(f'Roll, pitch correction: {roll_correction, pitch_correction}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, roll_correction, pitch_correction, 0), - parent=self) - - async def move_x(step=1): - await move_tasks.move_x(step=step, parent=self) - - async def move_y(step=1): - await move_tasks.move_y(step=step, parent=self) + start_time = Clock().now() def get_step_mult_factor(dist, threshold): if abs(dist) < threshold: @@ -1155,222 +900,188 @@ def get_step_mult_factor(dist, threshold): return -1 async def track_bin(target: CVObjectType, desired_depth, pixel_threshold, step_size=0.20, x_offset=0, y_offset=0): - logger.info(CV().distances[target]) + logger.info(f'[marker_dropper_task] Target pixel distances: {CV().distances[target]}') pixel_x = CV().distances[target].x + x_offset pixel_y = CV().distances[target].y + y_offset - width = CV().bounding_boxes[CVObjectType.BIN_RED].width - height = CV().bounding_boxes[CVObjectType.BIN_RED].height + width = CV().bounding_boxes[target].width + height = CV().bounding_boxes[target].height count = 1 while (max(pixel_x, pixel_y) > pixel_threshold or min(pixel_x, pixel_y) < -pixel_threshold) \ and width * height <= 1/3 * FRAME_AREA: - logger.info(CV().distances[target]) - - await move_x(step=step_size * get_step_mult_factor(pixel_x, pixel_threshold)) - await move_y(step=step_size * get_step_mult_factor(pixel_y, pixel_threshold)) + await self.move_x(step=step_size * get_step_mult_factor(pixel_x, pixel_threshold)) + await self.move_y(step=step_size * get_step_mult_factor(pixel_y, pixel_threshold)) - width = CV().bounding_boxes[CVObjectType.BIN_RED].width - height = CV().bounding_boxes[CVObjectType.BIN_RED].height + logger.info(f'[marker_dropper_task] Target pixel distances: {CV().distances[target]}') + width = CV().bounding_boxes[target].width + height = CV().bounding_boxes[target].height pixel_x = CV().distances[target].x + x_offset pixel_y = CV().distances[target].y + y_offset - if count % 3 == 0: - logger.info('correcting depth') - await correct_depth(desired_depth=desired_depth) - logger.info('correcting roll and pitch') - await correct_roll_and_pitch() + if count % 2 == 0: + logger.info('[marker_dropper_task] Correcting depth') + await self.correct_depth(desired_depth=desired_depth) + + logger.info('[marker_dropper_task] Correcting roll and pitch') + await self.correct_roll_and_pitch(mult_factor=1.2) if width * height >= 1/6 * FRAME_AREA and \ abs(pixel_x) < pixel_threshold * 1.75 and abs(pixel_y) < pixel_threshold * 1.75: - logger.info(f'Reached area threshold: area = {width * height}') + logger.info(f'[marker_dropper_task] Reached area threshold: area = {width * height}') break if Clock().now() - start_time > TIMEOUT: - logger.warning('Track bin timed out') + logger.warning('[marker_dropper_task] Track bin timed out') break await Yield() - logger.info(f'x: {pixel_x}, y: {pixel_y}, area: {width * height}') + logger.info(f'[marker_dropper_task] Target x: {pixel_x}, y: {pixel_y}, area: {width * height}') count += 1 - logger.info('Finished tracking bin') - logger.info(f'x: {pixel_x}, y: {pixel_y}, area: {width * height}') + logger.info('[marker_dropper_task] Finished tracking bin') + logger.info(f'[marker_dropper_task] Target x: {pixel_x}, y: {pixel_y}, area: {width * height}') - await correct_depth(desired_depth=START_DEPTH_LEVEL) + await self.correct_depth(desired_depth=START_DEPTH_LEVEL) await track_bin(target=CVObjectType.BIN_RED, desired_depth=START_DEPTH_LEVEL, pixel_threshold=START_PIXEL_THRESHOLD) - await correct_yaw() + await self.correct_yaw(CV().angles[CVObjectType.BIN_WHOLE], yaw_tolerance=YAW_THRESHOLD) - await correct_depth(desired_depth=MID_DEPTH_LEVEL) + await self.correct_depth(desired_depth=MID_DEPTH_LEVEL) await track_bin(target=CVObjectType.BIN_RED, desired_depth=MID_DEPTH_LEVEL, pixel_threshold=MID_PIXEL_THRESHOLD, step_size=0.18, y_offset=30, x_offset=25) - drop_marker(MarkerDropperStates.LEFT) - logger.info('Dropped left marker') - await util_tasks.sleep(3, parent=self) + Servos().drop_marker(MarkerDropperStates.LEFT) + logger.info('[marker_dropper_task] Dropped left marker') + await util_tasks.sleep(2, parent=self) - drop_marker(MarkerDropperStates.RIGHT) - logger.info('Dropped right marker') + Servos().drop_marker(MarkerDropperStates.RIGHT) + logger.info('[marker_dropper_task] Dropped right marker') await util_tasks.sleep(2, parent=self) - await correct_depth(desired_depth=START_DEPTH_LEVEL) - logger.info(f'Corrected depth to {START_DEPTH_LEVEL}') + await self.correct_depth(desired_depth=START_DEPTH_LEVEL) + logger.info(f'[marker_dropper_task] Corrected depth to {START_DEPTH_LEVEL}') - logger.info('Completed bin task') + logger.info('[marker_dropper_task] Completed marker dropper task') -@task -async def yaw_to_cv_object(self: Task, cv_object: CVObjectType, direction=1, - yaw_threshold=math.radians(40), latency_threshold=10, - depth_level=0.5) -> Task[None, None, None] | bool: - """ - Corrects the yaw relative to the CV object - """ - DEPTH_LEVEL = State().orig_depth - depth_level - MAXIMUM_YAW = math.radians(35) - SCALE_FACTOR = 0.4 # How much the correction should be scaled down from yaw calculation - logger.info('Starting yaw_to_cv_object') +@comp_task +async def torpedo_task(self: CompTask, first_target: CVObjectType, + depth_level=0.5, direction=1) -> Task[None, None, None]: + assert first_target in [CVObjectType.TORPEDO_REEF_SHARK_TARGET, CVObjectType.TORPEDO_SAWFISH_TARGET], \ + f'Invalid first_animal: {first_target}. ' + 'Must be CVObjectType.TORPEDO_REEF_SHARK_TARGET or CVObjectType.TORPEDO_SAWFISH_TARGET' - async def correct_depth(): - # await move_tasks.depth_correction(DEPTH_LEVEL, parent=self) - await move_tasks.depth_correction(desired_depth=DEPTH_LEVEL, parent=self) + logger.info('[torpedo_task] Starting torpedo task') - def get_step_size(desired_yaw): - # desired yaw in radians - return min(abs(desired_yaw), MAXIMUM_YAW) + DEPTH_LEVEL = State().orig_depth - depth_level - def get_yaw_threshold(desired_yaw, cv_x): - if cv_x < 128 or cv_x > 512: - yaw_threshold = desired_yaw * 2 - if cv_x < 256 or cv_x > 384: - yaw_threshold = desired_yaw * 1.75 + def get_step_size(dist: float, dist_threshold: float) -> float: + if dist > 10: + goal_step = 4 + elif dist > 6: + goal_step = 3 + elif dist > 4: + goal_step = 1 + elif dist > 1.5: + goal_step = 0.5 else: - yaw_threshold = desired_yaw * 1.35 + goal_step = 0.25 + return min(dist - dist_threshold + 0.1, goal_step) - return yaw_threshold + async def move_to_torpedo(torpedo_dist_threshold=2): + await yaw_to_cv_object(CVObjectType.TORPEDO_BANNER, direction=direction, yaw_threshold=math.radians(15), + depth_level=depth_level, parent=self) - async def yaw_until_object_detection(): - logger.info("Beginning yaw_util_object_detection task") - MAXIMUM_YAW = math.radians(30) - step = 1 - while not CV().is_receiving_recent_cv_data(cv_object, latency_threshold): - if step <= 3: - angle = MAXIMUM_YAW - elif step == 4: - angle = -2 * MAXIMUM_YAW - elif step <= 8: - angle = -1 * MAXIMUM_YAW - else: - angle = 3 * MAXIMUM_YAW - logger.info(f'Yawed to find object more than 9 times, breaking loop.') + torpedo_dist = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x - logger.info(f'No {cv_object} detection, setting yaw setpoint {angle}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, angle * direction), - depth_level=depth_level, - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.3), - timeout=10, - parent=self) + await self.correct_y_to_cv_obj(CVObjectType.TORPEDO_BANNER) + await self.correct_depth(DEPTH_LEVEL) - if step > 8: - return False + while torpedo_dist > torpedo_dist_threshold: + logger.info(f'[torpedo_task] Torpedo dist x: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x}') + logger.info(f'[torpedo_task] Torpedo dist y: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.y}') + await self.move_x(step=get_step_size(torpedo_dist, torpedo_dist_threshold)) - # await correct_depth() - step += 1 - await Yield() - return True + await yaw_to_cv_object(CVObjectType.TORPEDO_BANNER, direction=-1, yaw_threshold=math.radians(15), + depth_level=depth_level, parent=self) + logger.info('[torpedo_task] Yaw corrected') - def robust_yaw_average(yaws): - yaws_sorted = sorted(yaws) - trimmed = yaws_sorted[1:-1] # remove min and max - return sum(trimmed) / len(trimmed) + await self.correct_y_to_cv_obj(CVObjectType.TORPEDO_BANNER) - # Yaw until object detection - # await correct_depth() - found = await yaw_until_object_detection() + if torpedo_dist < 3: + await self.correct_z_to_cv_obj(CVObjectType.TORPEDO_BANNER) # CV-based z-axis correction + else: + await self.correct_depth(DEPTH_LEVEL) - # Could not find, so just surface and pray... - if not found: - return False + await Yield() + torpedo_dist = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x - logger.info(f'{cv_object} detected. Now centering {cv_object} in frame...') + logger.info(f'[torpedo_task] Finished moving forwards to torpedo: {torpedo_dist}m away') - # Center detected object in camera frame + await self.correct_z_to_cv_obj(CVObjectType.TORPEDO_BANNER) # CV-based z-axis correction + + await move_to_torpedo() + logger.info('[torpedo_task] Finished moving forwards to torpedo') - # Take average of 5 numbers and use that for yaw to try and offset outliers - logger.info('Taking in 5 frames to calculate yaw offset') - cv_yaws = [] - for i in range(0,5): - cv_yaws.append(CV().bounding_boxes[cv_object].yaw) - await util_tasks.sleep(0.25, parent = self) + # Small offset to counteract camera positioning + await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, -0.5, 0.2, 0, 0, 0), parent=self) - cv_object_yaw = robust_yaw_average(cv_yaws) + # Determine which animal to target first and which to target second + if first_target == CVObjectType.TORPEDO_REEF_SHARK_TARGET: + second_target = CVObjectType.TORPEDO_SAWFISH_TARGET + elif first_target == CVObjectType.TORPEDO_SAWFISH_TARGET: + second_target = CVObjectType.TORPEDO_REEF_SHARK_TARGET - await correct_depth() - logger.info(f'abs(cv_object_yaw): {abs(cv_object_yaw)}') - logger.info(f'yaw_threshold: {yaw_threshold}') + # Center to first animal + animal = first_target + target_y = CV().bounding_boxes[animal].coords.y + target_z = CV().bounding_boxes[animal].coords.z + 0.1 - step = 1 - while abs(cv_object_yaw) > get_yaw_threshold(yaw_threshold, CV().bounding_boxes[cv_object].coords.x): - # If we have made 3 corrections already, trust that yaw is reasonable and continue forward - if step > 3: - logger.info(f'Yaw has been correcting more than 3 times, breaking loop.') - break + logger.info(f'[torpedo_task] Aligning to first target {animal} at y={target_y} and z={target_z}') + await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, target_y, target_z, 0, 0, 0), parent=self) - sign_cv_object_yaw = np.sign(cv_object_yaw) - correction = get_step_size(SCALE_FACTOR * cv_object_yaw) # Scale down CV yaw value + # Fire first torpedo + logger.info('[torpedo_task] Firing RIGHT torpedo') + await Servos().fire_torpedo(TorpedoStates.RIGHT) - # Robot agnostic code base fails once again - if get_robot_name() == RobotName.OOGWAY: - desired_yaw = sign_cv_object_yaw * correction - else: - desired_yaw = -1 * sign_cv_object_yaw * correction - - # actually do the yaw itself, and then correct depth - logger.info(f'Detected yaw {cv_object_yaw} is greater than threshold {yaw_threshold}. Actually yawing: {desired_yaw}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, desired_yaw), - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.15), - parent=self) - await correct_depth() - await Yield() + # Wait for torpedo servo to be available + await util_tasks.sleep(3, parent=self) - # over correction - if (not CV().is_receiving_recent_cv_data(cv_object, latency_threshold)): - logger.info(f'{cv_object} detection lost, running yaw_until_object_detection()') - await yaw_until_object_detection() + # Move back to see full banner + await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, -target_y, -target_z, 0, 0, 0), parent=self) - # recalculate the yaw - logger.info('Taking in 5 frames to calculate yaw offset') - cv_yaws = [] - for i in range(0,5): - cv_yaws.append(CV().bounding_boxes[cv_object].yaw) - await util_tasks.sleep(0.25, parent = self) + # Center to second animal + animal = second_target + target_y = CV().bounding_boxes[animal].coords.y - 0.1 + target_z = CV().bounding_boxes[animal].coords.z + 0.1 + logger.info(f'[torpedo_task] Aligning to second target {animal} at y={target_y} and z={target_z}') + await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, target_y, target_z, 0, 0, 0), parent=self) - cv_object_yaw = robust_yaw_average(cv_yaws) - step += 1 + # Fire second torpedo + logger.info('[torpedo_task] Firing LEFT torpedo') + await Servos().fire_torpedo(TorpedoStates.LEFT) - logger.info(f'{cv_object} centered, or limit has been reached.') + logger.info(f'[torpedo_task] Torpedo task completed') - await correct_depth() - return True -@task -async def octagon_task(self: Task, direction: int = 1) -> Task[None, None, None]: +@comp_task +async def octagon_task(self: CompTask, direction: int = 1) -> Task[None, None, None]: """ - Detects, move towards the yellow bins, then surfaces inside the octagon. Requires robot to have submerged 0.7 meters. + Detects, move towards the yellow bins, then surfaces inside the octagon. + Requires robot to have submerged 0.7 meters. """ - logger.info('Starting octagon task') + logger.info('[octagon_task] Starting octagon task') - DEPTH_LEVEL_AT_BINS = State().orig_depth - 1.2 # Depth for beginning of task and corrections during forward movement - DEPTH_LEVEL_ABOVE_BINS = State().orig_depth - 0.9 # Depth for going above bin before forward move - LATENCY_THRESHOLD = 2 # Latency for seeing the bottom bin - CONTOUR_SCORE_THRESHOLD = 2000 # Required bottom bin area for valid detection - - SCORE_THRESHOLD = 7500 # Area of bin before beginning surface logic for front camera - POST_FRONT_THRESHOLD_FORWARD_DISTANCE = 0.4 # in meters + DEPTH_LEVEL_AT_BINS = State().orig_depth - 1.2 # Depth for task beginning and corrections during forward movement + DEPTH_LEVEL_ABOVE_BINS = State().orig_depth - 0.9 # Depth for going above bin before forward move + LATENCY_THRESHOLD = 2 # Latency for seeing the bottom bin + CONTOUR_SCORE_THRESHOLD = 2000 # Required bottom bin area for valid detection + SCORE_THRESHOLD = 7500 # Area of bin before beginning surface logic for front camera + POST_FRONT_THRESHOLD_FORWARD_DISTANCE = 0.4 # in meters # Forward navigation case constants LOW_SCORE = 1500 @@ -1381,48 +1092,13 @@ async def octagon_task(self: Task, direction: int = 1) -> Task[None, None, None] HIGH_STEP_SIZE = 0.35 VERY_HIGH_STEP_SIZE = 0.2 - # LOW_SCORE = 1000 - # LOW_STEP_SIZE = 1.75 - # MED_SCORE = 2000 - # MED_STEP_SIZE = 1.25 - # HIGH_SCORE = 4000 - # HIGH_STEP_SIZE = 0.75 - # VERY_HIGH_STEP_SIZE = 0.25 - - async def correct_depth(desired_depth): - if abs(State().depth - desired_depth) < 0.15: - logger.info(f'Depth delta is below threshold, skipping.') - return - await move_tasks.correct_depth(desired_depth=desired_depth, parent=self) - - async def correct_yaw(): - yaw_correction = CV().bounding_boxes[CVObjectType.BIN_PINK_FRONT].yaw - logger.info(f'Yaw correction: {yaw_correction}') - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_correction * 0.7), - keep_orientation=True, - parent=self, - ) - logger.info('Corrected yaw') - - self.correct_yaw = correct_yaw - def is_receiving_pink_bin_data(latest_detection_time): - return latest_detection_time and CVObjectType.BIN_PINK_BOTTOM in CV().bounding_boxes and \ + return latest_detection_time and \ CV().bounding_boxes[CVObjectType.BIN_PINK_BOTTOM].score >= CONTOUR_SCORE_THRESHOLD and \ - Clock().now().seconds_nanoseconds()[0] - CV().bounding_boxes[CVObjectType.BIN_PINK_BOTTOM].header.stamp.sec < LATENCY_THRESHOLD and \ - abs(CV().bounding_boxes[CVObjectType.BIN_PINK_BOTTOM].header.stamp.secs - latest_detection_time) < LATENCY_THRESHOLD - - async def move_x(step=1) -> None: - await move_tasks.move_x(step=step, parent=self) - - def stabilize() -> None: - pose_to_hold = copy.deepcopy(State().state.pose.pose) - Controls().publish_desired_position(pose_to_hold) + CV().is_receiving_recent_cv_data(CVObjectType.BIN_PINK_BOTTOM, LATENCY_THRESHOLD, latest_detection_time) def get_step_size(last_step_size): bin_pink_score = CV().bounding_boxes[CVObjectType.BIN_PINK_FRONT].score - step = 0 if bin_pink_score < LOW_SCORE: step = LOW_STEP_SIZE @@ -1436,319 +1112,91 @@ def get_step_size(last_step_size): return min(step, last_step_size) async def move_to_pink_bins() -> None: - logger.info("Beginning move to bins") - - count = 1 - latest_detection_time = None - moved_above = False + logger.info('[octagon_task] Beginning move to pink bins') - if not is_receiving_pink_bin_data(latest_detection_time): - latency_threshold = 10 - MAXIMUM_YAW = math.radians(30) - step = 1 - while not CV().is_receiving_recent_cv_data(CVObjectType.BIN_PINK_FRONT, latency_threshold): - if step <= 3: - angle = MAXIMUM_YAW - elif step == 4: - angle = -2 * MAXIMUM_YAW - elif step <= 8: - angle = -1 * MAXIMUM_YAW - else: - angle = 3 * MAXIMUM_YAW - logger.info(f'Yawed to find object more than times, breaking loop.') - - logger.info(f'No {CVObjectType.BIN_PINK_FRONT} detection, setting yaw setpoint {angle}') - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, angle * direction), - depth_level=DEPTH_LEVEL_AT_BINS, - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.3), - timeout=10, - parent=self) - - if step > 8: - await move_tasks.move_to_pose_local(geometry_utils.create_pose(0.5, 0, 0, 0, 0, 0), - depth_level=DEPTH_LEVEL_AT_BINS, - pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.3), - timeout=10, - parent=self) - break + pink_bin_found = await yaw_until_object_detection(CVObjectType.BIN_PINK_FRONT, + depth_level=DEPTH_LEVEL_AT_BINS, direction=direction) - # await correct_depth() - step += 1 - await Yield() + if not pink_bin_found: + await move_tasks.move_to_pose_local( + geometry_utils.create_pose(0.5, 0, 0, 0, 0, 0), + depth_level=DEPTH_LEVEL_AT_BINS, + pose_tolerances=move_tasks.create_twist_tolerance(angular_yaw=0.3), + timeout=10, + parent=self + ) + latest_detection_time = None + moved_above = False last_step_size = float('inf') + while not is_receiving_pink_bin_data(latest_detection_time) and not moved_above: - if CVObjectType.BIN_PINK_BOTTOM in CV().bounding_boxes: - latest_detection_time = CV().bounding_boxes[CVObjectType.BIN_PINK_BOTTOM].header.stamp.sec - check = is_receiving_pink_bin_data(latest_detection_time) - if check: - logger.info(f'Bottom camera detects bin, ending loop.') - break + latest_detection_time = CV().bounding_boxes[CVObjectType.BIN_PINK_BOTTOM].header.stamp.secs + if is_receiving_pink_bin_data(latest_detection_time): + logger.info('[octagon_task] Bottom camera detects pink bin, ending loop.') + break + + logger.info('[octagon_task] Have not moved above, depth correcting and beginning sequence.') + await self.correct_depth(DEPTH_LEVEL_AT_BINS if not moved_above else DEPTH_LEVEL_ABOVE_BINS, + skip_threshold=0.15) - logger.info("Have not moved above, depth correcting and beginning sequence.") - await correct_depth(DEPTH_LEVEL_AT_BINS if not moved_above else DEPTH_LEVEL_ABOVE_BINS) if not moved_above: - logger.info("Yawing to pink bin front") - status = await yaw_to_cv_object(CVObjectType.BIN_PINK_FRONT, direction=direction, yaw_threshold=math.radians(15), - depth_level=1.2, parent=Task.MAIN_ID) - # At this point, we cannot find the object anymore. we hope to surface in the octagon atp + logger.info('[octagon_task] Yawing to pink bin front') + status = await yaw_to_cv_object(CVObjectType.BIN_PINK_FRONT, direction=direction, + yaw_threshold=math.radians(15), depth_level=1.2, parent=Task.MAIN_ID) + + # At this point, we cannot find the object anymore. We hope to surface in the octagon at this point if not status: - logger.info("Yaw failed, ending.") + logger.info('[octagon_task] Yaw failed, ending.') break - logger.info(f'Bin pink front score: {CV().bounding_boxes[CVObjectType.BIN_PINK_FRONT].score}') + logger.info(f'[octagon_task] Bin front score: {CV().bounding_boxes[CVObjectType.BIN_PINK_FRONT].score}') if CV().bounding_boxes[CVObjectType.BIN_PINK_FRONT].score > SCORE_THRESHOLD and not moved_above: - logger.info(f'Beginning to move above bin') - await correct_depth(DEPTH_LEVEL_ABOVE_BINS) + logger.info('[octagon_task] Beginning to move above bin') + await self.correct_depth(DEPTH_LEVEL_ABOVE_BINS, skip_threshold=0.15) + + logger.info('[octagon_task] Moved above pink bins, based on front camera.') moved_above = True - logger.info('Moved above pink bins, based on front camera.') if not moved_above: step = get_step_size(last_step_size) - logger.info(f'Moving step size {step}') - await move_x(step=step) - logger.info(f'Finished forwards move') + + logger.info(f'[octagon_task] Moving step size {step}') + await self.move_x(step=step) + last_step_size = step await Yield() - count += 1 - - logger.info(f'Receiving bottom bin data: {is_receiving_pink_bin_data(latest_detection_time)}') + logger.info(f'[octagon_task] Receiving bottom bin: {is_receiving_pink_bin_data(latest_detection_time)}') if moved_above: - await move_tasks.move_with_directions([(POST_FRONT_THRESHOLD_FORWARD_DISTANCE, 0, 0)], parent=self) + await self.move_x(step=POST_FRONT_THRESHOLD_FORWARD_DISTANCE) else: - logger.info('Detected bin_pink_bottom, or yaw has failed (and copium ensues)') + logger.info('[octagon_task] Detected bin_pink_bottom, or yaw has failed.') - logger.info('Reached pink bins, stabilizing...') - stabilize() + logger.info('[octagon_task] Reached pink bins, stabilizing...') + await self.stabilize() await util_tasks.sleep(5, parent=self) async def face_fish(yaw_left: bool = True, closer_banner: bool = True): direction = 1 if yaw_left else -1 - yaw_distance = np.pi/4 if closer_banner else 3*np.pi/4 + yaw_distance = np.pi / 4 if closer_banner else 3 * np.pi / 4 await orient_to_wall(parent=self) - await move_tasks.yaw_from_local_pose(direction*yaw_distance, parent=self) + await self.correct_yaw(direction * yaw_distance) await move_to_pink_bins() await face_fish(yaw_left=True, closer_banner=True) - logger.info('Surfacing...') + logger.info('[octagon_task] Surfacing...') await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, State().orig_depth - State().depth, 0, 0, 0), timeout=10, parent=self) - logger.info('Finished surfacing') - -@task -async def torpedo_task(self: Task, first_target: CVObjectType, depth_level=0.5, direction=1) -> Task[None, None, None]: - # await scan_for_torpedo() - logger.info('Starting torpedo task') - DEPTH_LEVEL = State().orig_depth - depth_level - - async def correct_y() -> Coroutine[None, None, None]: - await cv_tasks.correct_y(prop=CVObjectType.TORPEDO_BANNER, parent=self) - - async def correct_z() -> Coroutine[None, None, None]: - await cv_tasks.correct_z(prop=CVObjectType.TORPEDO_BANNER, parent=self) - - async def correct_depth() -> Coroutine[None, None, None]: - await move_tasks.correct_depth(desired_depth=DEPTH_LEVEL, parent=self) - - async def move_x(step: float = 1) -> Coroutine[None, None, None]: - logger.info(f"Moving forward {step}") - await move_tasks.move_x(step=step, parent=self) - - def get_step_size(dist:float, dist_threshold:float) -> float: - goal_step = 0.25 - if dist > 10: - goal_step = 4 - elif dist > 6: - goal_step = 3 - elif dist > 4: - goal_step = 1 - elif dist > 1.5: - goal_step = 0.5 - return min(dist - dist_threshold + 0.1, goal_step) - - async def move_to_torpedo(torpedo_dist_threshold=2): - await yaw_to_cv_object(CVObjectType.TORPEDO_BANNER, direction=direction, yaw_threshold=math.radians(15), - depth_level=depth_level, parent=self) - torpedo_dist = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x - await correct_y() - await correct_depth() - - while torpedo_dist > torpedo_dist_threshold: - logger.info(f"Torpedo dist x: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x}") - logger.info(f"Torpedo dist y: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.y}") - await move_x(step=get_step_size(torpedo_dist, torpedo_dist_threshold)) - - await yaw_to_cv_object(CVObjectType.TORPEDO_BANNER, direction=-1, yaw_threshold=math.radians(15), - depth_level=depth_level, parent=self) - logger.info(f"Yaw corrected") - await correct_y() - - if torpedo_dist < 3: - await correct_z() # CV-based z-axis correction - else: - await correct_depth() - - await Yield() - torpedo_dist = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x - - logger.info(f"Finished moving forwards to torpedo: {torpedo_dist}m away") - - await correct_z() - - - await move_to_torpedo() - logger.info(f"Finished moving forwards to torpedo") - - # Small offset to counteract camera positioning - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, -0.5, 0.2, 0, 0, 0), - parent=self, - ) - - # Determine which animal to target first and which to target second - if first_target == CVObjectType.TORPEDO_REEF_SHARK_TARGET: - second_target = CVObjectType.TORPEDO_SAWFISH_TARGET - elif first_target == CVObjectType.TORPEDO_SAWFISH_TARGET: - second_target = CVObjectType.TORPEDO_REEF_SHARK_TARGET - else: - raise ValueError(f"Invalid first_animal: {first_target}. Must be CVObjectType.TORPEDO_REEF_SHARK_TARGET or CVObjectType.TORPEDO_SAWFISH_TARGET") - - # Center to first animal - animal = first_target - target_y = CV().bounding_boxes[animal].coords.y - target_z = CV().bounding_boxes[animal].coords.z+0.1 - logger.info(f"Aligning to first target {animal} at y={target_y} and z={target_z}") - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, target_y, target_z, 0, 0, 0), - parent=self, - ) - - # Fire first torpedo - logger.info(f"Firing torpedo RIGHT") - await Servos().fire_torpedo(TorpedoStates.RIGHT) - - # Wait for torpedo servo is available - await util_tasks.sleep(Duration(seconds=3), parent=self) - - # Move back to see full banner - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, -target_y, -target_z, 0, 0, 0), - parent=self, - ) - - # Center to second animal - animal = second_target - target_y = CV().bounding_boxes[animal].coords.y - 0.1 - target_z = CV().bounding_boxes[animal].coords.z + 0.1 - logger.info(f"Aligning to second target {animal} at y={target_y} and z={target_z}") - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, target_y, target_z, 0, 0, 0), - parent=self, - ) - - # Fire second torpedo - logger.info(f"Firing torpedo LEFT") - await Servos().fire_torpedo(TorpedoStates.LEFT) - - logger.info(f"Torpedo task completed") - -@task -async def torpedo_task_old(self: Task, - target_animal: Literal[CVObjectType.TORPEDO_SAWFISH, - CVObjectType.TORPEDO_REEF_SHARK] = CVObjectType.TORPEDO_REEF_SHARK, - depth_level: float = 0.5) -> Task[None, None, None]: - """ - TODO: Test this task - """ - step_size = 1 - shooting_distance = 1 - - async def correct_depth() -> None: - await move_tasks.correct_depth(desired_depth=depth_level, parent=self) + logger.info('[octagon_task] Finished surfacing') - async def yaw_to_torpedo_banner() -> None: - await yaw_to_cv_object(CVObjectType.TORPEDO_BANNER, direction=1, yaw_threshold=math.radians(15), - depth_level=depth_level, parent=Task.MAIN_ID) - - def get_step_size(dist: float) -> float: - return min(dist - step_size, shooting_distance) - - async def correct_y() -> Coroutine[None, None, None]: - await cv_tasks.correct_y(prop=CVObjectType.TORPEDO_BANNER, parent=self) - - async def correct_z() -> Coroutine[None, None, None]: - await cv_tasks.correct_z(prop=CVObjectType.TORPEDO_BANNER, parent=self) - - async def move_x(step:float = 1) -> Coroutine[None, None, None]: - await move_tasks.move_x(step=step, parent=self) - - # async def correct_yaw_with_depthai() -> None: - # yaw_correction = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].yaw * 0.3 - # logger.info(f'Yaw correction: {yaw_correction}') - # sign = 1 if yaw_correction > 0.1 else (-1 if yaw_correction < -0.1 else 0) - # await move_tasks.move_to_pose_local( - # geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_correction + (sign * 0.1)), - # keep_orientation=True, - # parent=self, - # ) - # logger.info('Corrected yaw') - - async def move_to_torpedo_banner(): - await yaw_to_torpedo_banner() - # await correct_depth() - - banner_dist = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x - logger.info(f'banner x dist: {banner_dist}') - while banner_dist > shooting_distance: - logger.info(f'step_size: {get_step_size(banner_dist)}') - await move_x(step=get_step_size(banner_dist)) - await yaw_to_torpedo_banner() - logger.info(f"Torpedo banner dist: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x}") - await correct_y() - if banner_dist < 3: - await correct_z() - # else: - # await correct_depth() - await Yield() - banner_dist = CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x - logger.info(f"Torpedo banner dist: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.x}") - - await move_to_torpedo_banner() - - async def center_with_torpedo_target(): - # await correct_yaw_with_depthai() # Hopefully we know we are normal to the torpedo banner now and also centered with the banner. - # await correct_depth() - target_dist_y = CV().bounding_boxes[target_animal].coords.y - target_dist_z = CV().bounding_boxes[target_animal].coords.z - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, target_dist_y, target_dist_z, 0, 0, 0), - keep_orientation=True, - parent=self, - ) - logger.info(f'Centered on torpedo target, y: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.y}, \ - z: {CV().bounding_boxes[CVObjectType.TORPEDO_BANNER].coords.z}') - - # Move the Z to account for distance between camera and torpedo launcher - offset = -0.05 - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, offset, 0, 0, 0), - keep_orientation=True, - parent=self, - ) - - logger.info('FIRING TORPEDO') - # await Servos().fire_torpedo(TorpedoStates.RIGHT) - - await center_with_torpedo_target() - -@task -async def orient_to_wall(self: Task[None, None, None], +@comp_task +async def orient_to_wall(self: CompTask[None, None, None], end_angle: float = 15.0, start_angle: float = -15.0, distance: float = 10.0) -> Task[None, None, None]: @@ -1762,16 +1210,19 @@ async def get_sonar_normal_angle() -> float | None: # Call sonar sweep request sonar_future = Sonar().sweep(start_angle, end_angle, distance) if sonar_future is not None: - logger.info("Sonar sweep request sent, waiting for response...") + logger.info('[orient_to_wall] Sonar sweep request sent, waiting for response...') + # Wait for the sonar sweep to complete sonar_response = await sonar_future - logger.info(f"Sonar sweep response received: {sonar_response}") + logger.info(f'[orient_to_wall] Sonar sweep response received: {sonar_response}') + if not sonar_response.is_object: - logger.info(f"Did not get valid object from scan") + logger.info('[orient_to_wall] Did not get valid object from scan') return None + return sonar_response.normal_angle else: - logger.warning("Sonar sweep request failed - bypass mode or service unavailable") + logger.warning('[orient_to_wall] Sonar sweep request failed - bypass mode or service unavailable') return math.nan def convert_sonar_output_to_yaw(sonar_normal: float) -> float: @@ -1787,20 +1238,39 @@ def convert_sonar_output_to_yaw(sonar_normal: float) -> float: while not got_valid_sweep and count > 0: count -= 1 + sonar_output = await get_sonar_normal_angle() + if sonar_output is None: - logger.info(f"Did not get valid sweep object, trying again {count} times") + logger.info(f'[orient_to_wall] Did not get valid sweep object, trying again {count} times') continue got_valid_sweep = True if not math.isnan(sonar_output): yaw_delta = convert_sonar_output_to_yaw(sonar_output) - logger.info(f"Yaw delta from sonar sweep: {yaw_delta} radians") + logger.info(f'[orient_to_wall] Yaw delta from sonar sweep: {yaw_delta} radians') + # Move to the desired yaw angle - await move_tasks.move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw_delta), - timeout=10, - parent=self, - ) + await self.correct_yaw(yaw_delta, timeout=10) else: - logger.warning("No yaw delta received from sonar sweep, skipping orientation.") \ No newline at end of file + logger.warning('[orient_to_wall] No yaw delta received from sonar sweep, skipping orientation.') + + +@comp_task +async def return_task_dead_reckoning(self: CompTask, depth_level=0.7) -> Task[None, None, None]: + DEPTH_LEVEL = State().orig_depth - depth_level + + logger.info('[return_task_dead_reckoning] Started gate return task') + + if get_robot_name() == RobotName.OOGWAY: + pass + elif get_robot_name() == RobotName.CRUSH: + directions = [ + (-3, 0, 0), + (-3, 0, 0), + (-3, 0, 0), + (-3, 0, 0), + ] + await self.move_with_directions(directions, depth_level=DEPTH_LEVEL, timeout=15) + + logger.info('[return_task_dead_reckoning] Moved through gate return') diff --git a/onboard/src/task_planning/task_planning/tasks/move_tasks.py b/onboard/src/task_planning/task_planning/tasks/move_tasks.py index 4ad1188e..3c1a0d4d 100644 --- a/onboard/src/task_planning/task_planning/tasks/move_tasks.py +++ b/onboard/src/task_planning/task_planning/tasks/move_tasks.py @@ -112,30 +112,6 @@ def send_transformer(local_pose: Pose | None) -> Pose | None: move_to_pose_global(global_pose, pose_tolerances=pose_tolerances, timeout=timeout, parent=self), send_transformer=send_transformer) -# TODO: deprecate this, use correct_yaw from base_comp_task instead -@task -async def yaw_from_local_pose(self: Task, yaw: int, timeout: int = 30) -> None: - """ - Yaw from current local position to some offset yaw - - Args: - self (Task): The task instance on which the method is called. - yaw (int): The amount to yaw, in radian. - time_limit (int, optional): The time limit (in seconds) for reaching the pose. Defaults to 30. - - Returns: - None - """ - - logger.info(f'Yawing {yaw} from current position') - await move_to_pose_local( - geometry_utils.create_pose(0, 0, 0, 0, 0, yaw), - keep_orientation=True, - time_limit = timeout, - parent=self, - ) - logger.info(f'Finished yaw') - @task async def move_with_velocity(_self: Task, twist: Twist) -> Task[None, Twist | None, None]: @@ -216,11 +192,12 @@ async def depth_correction(self: Task, desired_depth: float) -> Task[None, None, logger.info(f'Started depth correction {depth_delta}') await move_to_pose_local( geometry_utils.create_pose(0, 0, depth_delta, 0, 0, 0), - pose_tolerances=create_twist_tolerance(linear_z=0.17), + pose_tolerances=create_twist_tolerance(linear_z=0.1), timeout=15, parent=self) logger.info(f'Finished depth correction {depth_delta}') + @task async def move_x(self: Task, step: float = 1.0) -> None: """ @@ -242,6 +219,7 @@ async def move_x(self: Task, step: float = 1.0) -> None: parent=self) logger.info(f'Moved x {step}') + @task async def move_y(self: Task, step: float = 1.0) -> None: """ @@ -268,7 +246,6 @@ async def move_y(self: Task, step: float = 1.0) -> None: async def move_with_directions(self: Task, directions: Directions, depth_level: float | None = None, - yaw_heading: float | None = None, correct_yaw: bool = False, correct_depth: bool = False, keep_orientation: bool = False, @@ -286,6 +263,8 @@ async def move_with_directions(self: Task, directions (Directions): A list of tuples, where each tuple specifies the target pose. - Tuples of length 3 represent (x, y, z). - Tuples of length 6 represent (x, y, z, roll, pitch, yaw). + depth_level (float, optional): The depth, as provided by the pressure sensor, the robot should move to. If this + is not None, the Z value of the provided pose will be overridden. Defaults to None. correct_yaw (bool, optional): If True, corrects the yaw after moving to a pose. Defaults to False. correct_depth (bool, optional): If True, corrects the depth after moving to a pose. Defaults to False. keep_orientation (bool, optional): If True, corrects orientation after moving to a pose. Defaults to False. @@ -314,7 +293,7 @@ async def move_with_directions(self: Task, if correct_yaw: logger.info(f'Correcting yaw {orig_gyro - State().gyro_euler_angles.z}') - await move_to_pose_local(geometry_utils.create_pose(0,0,0,0,0,orig_gyro - State().gyro_euler_angles.z), + await move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, orig_gyro - State().gyro_euler_angles.z), timeout=timeout, parent=self) if correct_depth: diff --git a/onboard/src/task_planning/task_planning/tasks/sonar_tasks.py b/onboard/src/task_planning/task_planning/tasks/sonar_tasks.py index f0118cce..16cc18d2 100644 --- a/onboard/src/task_planning/task_planning/tasks/sonar_tasks.py +++ b/onboard/src/task_planning/task_planning/tasks/sonar_tasks.py @@ -1,9 +1,9 @@ from typing import cast +from custom_msgs.srv import SonarSweepRequest from rclpy.logging import get_logger from task_planning.interface.sonar import Sonar from task_planning.task import Task, task -from custom_msgs.srv import SonarSweepRequest logger = get_logger('sonar_tasks')