Skip to content
118 changes: 64 additions & 54 deletions tinynav/platforms/cmd_vel_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,87 @@
from geometry_msgs.msg import Twist
from nav_msgs.msg import Path
from nav_msgs.msg import Odometry
from scipy.spatial.transform import Rotation as R
import numpy as np
import logging
import time

from tinynav.core.math_utils import msg2np as _msg2np

# Module-level logger for cases where self.get_logger() is not available
logger = logging.getLogger(__name__)


def _nearest_tangent(path_world: list, robot_xy: np.ndarray) -> np.ndarray:
"""Direction of the path segment closest to the robot."""
d_best = float('inf')
i_best = 0
for i in range(len(path_world) - 1):
d = np.linalg.norm(path_world[i][:2] - robot_xy)
if d < d_best:
d_best = d
i_best = i
return path_world[i_best + 1][:2] - path_world[i_best][:2]


class CmdVelControlNode(Node):
def __init__(self):
super().__init__('cmd_vel_control_node')
self.logger = self.get_logger() # Use ROS2 logger
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.pose_sub = self.create_subscription(Odometry, '/slam/odometry', self.pose_callback, 10)
self.create_subscription(Path, '/planning/trajectory_path', self.path_callback, 10)
self.T_robot_to_camera = np.array([
[0, -1, 0, 0],
[0, 0, -1, 0],
[1, 0, 0, 0],
[0, 0, 0, 1]]
)

self.last_path_time = 0.0
self.pose = None
self.path = None
self.path_world = []

# === Control loop (ported from planning_node_compare style) ===
self.cmd_rate_hz = 20.0
self.cmd_rate_hz = 30.0
self.path_stale_slow_s = 0.3
self.path_stale_stop_s = 0.6
self.max_linear_acc = 0.4 # m/s^2
self.max_angular_acc = 0.8 # rad/s^2
self.max_linear_speed = 0.8
self.max_linear_acc = 2.0 # m/s^2
self.max_angular_acc = 2.5 # rad/s^2
self.max_angular_speed = 0.5

self.latest_cmd = Twist()
self.prev_cmd = Twist()
self.last_cmd_pub_time = time.monotonic()
self.last_path_update_time = None
self.cmd_timer = self.create_timer(1.0 / self.cmd_rate_hz, self.cmd_timer_callback)


def _update_cmd(self):
if not self.path_world or self.pose is None:
return
T, _ = _msg2np(self.pose)
robot_xy = T[:2, 3]
forward_xy = (T[:3, :3] @ np.array([0.0, 0.0, 1.0]))[:2]

tangent = _nearest_tangent(self.path_world, robot_xy)
norm_f = np.linalg.norm(forward_xy)
norm_t = np.linalg.norm(tangent)
if norm_f < 1e-6 or norm_t < 1e-6:
return

cross = forward_xy[0] / norm_f * tangent[1] / norm_t - forward_xy[1] / norm_f * tangent[0] / norm_t
dot = forward_xy[0] / norm_f * tangent[0] / norm_t + forward_xy[1] / norm_f * tangent[1] / norm_t
heading_err = float(np.arctan2(cross, dot))

dist_to_end = float(np.linalg.norm(self.path_world[-1][:2] - robot_xy))
dist_scale = float(np.clip(dist_to_end, 0.2, 1.0))
heading_scale = max(0.0, float(np.cos(heading_err)))

vx = float(np.clip(self.max_linear_speed * heading_scale * dist_scale, 0.0, self.max_linear_speed))
if abs(heading_err) > 1.0:
vx *= 0.40
vyaw = float(np.clip(1.8 * heading_err, -self.max_angular_speed, self.max_angular_speed))

self.latest_cmd.linear.x = vx
self.latest_cmd.angular.z = vyaw

def pose_callback(self, msg):
self.pose = msg
self._update_cmd()

def _clamp_step(self, target: float, current: float, max_delta: float) -> float:
return float(np.clip(target - current, -max_delta, max_delta) + current)
Expand Down Expand Up @@ -74,61 +115,30 @@ def cmd_timer_callback(self):

self.cmd_pub.publish(out)
self.prev_cmd = out

def path_callback(self, msg):
if msg is None or self.pose is None:
if msg is None or len(msg.poses) < 2:
return
if len(msg.poses) < 2:
return
self.path = msg

current_time = self.get_clock().now().to_msg().sec + self.get_clock().now().to_msg().nanosec * 1e-9

self.last_path_time = current_time
self.path_world = [
np.array([p.pose.position.x, p.pose.position.y, p.pose.position.z], dtype=np.float32)
for p in msg.poses
]
self.last_path_update_time = time.monotonic()

def msg2np(msg):
T = np.eye(4)
position = msg.pose.position
rot = msg.pose.orientation
quat = [rot.x, rot.y, rot.z, rot.w]
T[:3, :3] = R.from_quat(quat).as_matrix()
T[:3, 3] = np.array([position.x, position.y, position.z]).ravel()
return T

T1 = msg2np(self.path.poses[0])
T2 = msg2np(self.path.poses[1])
T_robot_1 = T1 @ self.T_robot_to_camera
T_robot_2 = T2 @ self.T_robot_to_camera
T_robot_2_to_1 = np.linalg.inv(T_robot_1) @ T_robot_2
p = T_robot_2_to_1[:3, 3]
dt = 0.1 # Planning node trajectory time step (duration=2.0, dt=0.1)
linear_velocity_vec = p / dt
r = R.from_matrix(T_robot_2_to_1[:3, :3])
angular_velocity_vec = r.as_rotvec() / dt

vx = np.clip(linear_velocity_vec[0], -0.1, 0.3)
vy = 0.0
vyaw = np.clip(angular_velocity_vec[2], -0.8, 0.8)
self.latest_cmd.linear.x = float(vx)
self.latest_cmd.linear.y = float(vy)
self.latest_cmd.angular.z = float(vyaw)
age = 0.0 if self.last_path_update_time is None else (time.monotonic() - self.last_path_update_time)
self.logger.debug(f"cmd vx={vx:.3f} vyaw={vyaw:.3f} path_age={age:.2f}s")
self._update_cmd()

def destroy_node(self):
self.logger.info("Destroying cmd_vel_control connection.")
super().destroy_node()

def main(args=None):
rclpy.init(args=args)

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(filename)s:%(lineno)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)

node = CmdVelControlNode()
try:
rclpy.spin(node)
Expand All @@ -138,6 +148,6 @@ def main(args=None):
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()

if __name__ == '__main__':
main()
Loading