diff --git a/tinynav/platforms/cmd_vel_control.py b/tinynav/platforms/cmd_vel_control.py index 8ad76263..c530d623 100644 --- a/tinynav/platforms/cmd_vel_control.py +++ b/tinynav/platforms/cmd_vel_control.py @@ -3,14 +3,28 @@ from geometry_msgs.msg import Twist from nav_msgs.msg import Path from nav_msgs.msg import Odometry -from scipy.spatial.transform import Rotation as R import numpy as np import logging import time +from tinynav.core.math_utils import msg2np as _msg2np + # Module-level logger for cases where self.get_logger() is not available logger = logging.getLogger(__name__) + +def _nearest_tangent(path_world: list, robot_xy: np.ndarray) -> np.ndarray: + """Direction of the path segment closest to the robot.""" + d_best = float('inf') + i_best = 0 + for i in range(len(path_world) - 1): + d = np.linalg.norm(path_world[i][:2] - robot_xy) + if d < d_best: + d_best = d + i_best = i + return path_world[i_best + 1][:2] - path_world[i_best][:2] + + class CmdVelControlNode(Node): def __init__(self): super().__init__('cmd_vel_control_node') @@ -18,31 +32,58 @@ def __init__(self): self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10) self.pose_sub = self.create_subscription(Odometry, '/slam/odometry', self.pose_callback, 10) self.create_subscription(Path, '/planning/trajectory_path', self.path_callback, 10) - self.T_robot_to_camera = np.array([ - [0, -1, 0, 0], - [0, 0, -1, 0], - [1, 0, 0, 0], - [0, 0, 0, 1]] - ) + self.last_path_time = 0.0 self.pose = None - self.path = None + self.path_world = [] # === Control loop (ported from planning_node_compare style) === - self.cmd_rate_hz = 20.0 + self.cmd_rate_hz = 30.0 self.path_stale_slow_s = 0.3 self.path_stale_stop_s = 0.6 - self.max_linear_acc = 0.4 # m/s^2 - self.max_angular_acc = 0.8 # rad/s^2 + self.max_linear_speed = 0.8 + self.max_linear_acc = 2.0 # m/s^2 + self.max_angular_acc = 2.5 # rad/s^2 + self.max_angular_speed = 0.5 self.latest_cmd = Twist() self.prev_cmd = Twist() self.last_cmd_pub_time = time.monotonic() self.last_path_update_time = None self.cmd_timer = self.create_timer(1.0 / self.cmd_rate_hz, self.cmd_timer_callback) - + + def _update_cmd(self): + if not self.path_world or self.pose is None: + return + T, _ = _msg2np(self.pose) + robot_xy = T[:2, 3] + forward_xy = (T[:3, :3] @ np.array([0.0, 0.0, 1.0]))[:2] + + tangent = _nearest_tangent(self.path_world, robot_xy) + norm_f = np.linalg.norm(forward_xy) + norm_t = np.linalg.norm(tangent) + if norm_f < 1e-6 or norm_t < 1e-6: + return + + cross = forward_xy[0] / norm_f * tangent[1] / norm_t - forward_xy[1] / norm_f * tangent[0] / norm_t + dot = forward_xy[0] / norm_f * tangent[0] / norm_t + forward_xy[1] / norm_f * tangent[1] / norm_t + heading_err = float(np.arctan2(cross, dot)) + + dist_to_end = float(np.linalg.norm(self.path_world[-1][:2] - robot_xy)) + dist_scale = float(np.clip(dist_to_end, 0.2, 1.0)) + heading_scale = max(0.0, float(np.cos(heading_err))) + + vx = float(np.clip(self.max_linear_speed * heading_scale * dist_scale, 0.0, self.max_linear_speed)) + if abs(heading_err) > 1.0: + vx *= 0.40 + vyaw = float(np.clip(1.8 * heading_err, -self.max_angular_speed, self.max_angular_speed)) + + self.latest_cmd.linear.x = vx + self.latest_cmd.angular.z = vyaw + def pose_callback(self, msg): self.pose = msg + self._update_cmd() def _clamp_step(self, target: float, current: float, max_delta: float) -> float: return float(np.clip(target - current, -max_delta, max_delta) + current) @@ -74,61 +115,30 @@ def cmd_timer_callback(self): self.cmd_pub.publish(out) self.prev_cmd = out - + def path_callback(self, msg): - if msg is None or self.pose is None: + if msg is None or len(msg.poses) < 2: return - if len(msg.poses) < 2: - return - self.path = msg - - current_time = self.get_clock().now().to_msg().sec + self.get_clock().now().to_msg().nanosec * 1e-9 - - self.last_path_time = current_time + self.path_world = [ + np.array([p.pose.position.x, p.pose.position.y, p.pose.position.z], dtype=np.float32) + for p in msg.poses + ] self.last_path_update_time = time.monotonic() - - def msg2np(msg): - T = np.eye(4) - position = msg.pose.position - rot = msg.pose.orientation - quat = [rot.x, rot.y, rot.z, rot.w] - T[:3, :3] = R.from_quat(quat).as_matrix() - T[:3, 3] = np.array([position.x, position.y, position.z]).ravel() - return T - - T1 = msg2np(self.path.poses[0]) - T2 = msg2np(self.path.poses[1]) - T_robot_1 = T1 @ self.T_robot_to_camera - T_robot_2 = T2 @ self.T_robot_to_camera - T_robot_2_to_1 = np.linalg.inv(T_robot_1) @ T_robot_2 - p = T_robot_2_to_1[:3, 3] - dt = 0.1 # Planning node trajectory time step (duration=2.0, dt=0.1) - linear_velocity_vec = p / dt - r = R.from_matrix(T_robot_2_to_1[:3, :3]) - angular_velocity_vec = r.as_rotvec() / dt - - vx = np.clip(linear_velocity_vec[0], -0.1, 0.3) - vy = 0.0 - vyaw = np.clip(angular_velocity_vec[2], -0.8, 0.8) - self.latest_cmd.linear.x = float(vx) - self.latest_cmd.linear.y = float(vy) - self.latest_cmd.angular.z = float(vyaw) - age = 0.0 if self.last_path_update_time is None else (time.monotonic() - self.last_path_update_time) - self.logger.debug(f"cmd vx={vx:.3f} vyaw={vyaw:.3f} path_age={age:.2f}s") + self._update_cmd() def destroy_node(self): self.logger.info("Destroying cmd_vel_control connection.") super().destroy_node() - + def main(args=None): rclpy.init(args=args) - + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(filename)s:%(lineno)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) - + node = CmdVelControlNode() try: rclpy.spin(node) @@ -138,6 +148,6 @@ def main(args=None): node.destroy_node() if rclpy.ok(): rclpy.shutdown() - + if __name__ == '__main__': main()