Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ pip install tinynav
tinynav init
tinynav doctor
tinynav example
tinynav nav
tinynav nav status
tinynav nav start --map-name <map_name>
tinynav nav go --map-name <map_name> [--pois 2,1,0]
tinynav nav stop
tinynav map status
tinynav map start_record
tinynav map stop_record
Expand Down Expand Up @@ -104,3 +107,41 @@ The intent is:
- recording outputs go under `rosbags/`
- map build outputs go under `maps/`
- `map build` consumes a rosbag name from `rosbags/` and produces a map directory in `maps/`


## `tinynav nav` commands

The `tinynav nav` workflow is managed inside a tmux session in the running container.
The CLI currently uses the session name `tinynav_nav`.

### Nav states

- `idle`
- no navigation tmux session exists
- `starting`
- tmux session exists but required ROS nodes are not all visible yet
- `running`
- required ROS nodes are all present

Required ROS nodes:

- `/perception_node`
- `/planning_node`
- `/map_node`
- `/cmd_vel_control_node`

### Commands

- `tinynav nav status`
- reports one of `idle`, `starting`, or `running`
- `tinynav nav start --map-name <map_name>`
- allowed only in `idle`
- starts navigation panes in a tmux session inside the container
- passes the selected map path to `map_node.py`
- `tinynav nav go --map-name <map_name> [--pois 2,1,0]`
- loads `maps/<map_name>/pois.json`
- if `--pois` is provided, reorders the outer keys to match the requested POI id sequence
- keeps each POI object's inner `id` unchanged
- publishes the payload to `/mapping/cmd_pois` as `std_msgs/String`
- `tinynav nav stop`
- stops the tmux-managed navigation workflow inside the container
206 changes: 200 additions & 6 deletions src/tinynav_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import platform
import random
import shlex
import shutil
import subprocess
import sys
Expand Down Expand Up @@ -36,6 +37,7 @@
MAP_RECORD_SESSION = "tinynav_map_record"
MAP_BUILD_SESSION = "tinynav_map_build"
MAP_EDIT_POIS_SESSION = "tinynav_map_edit_pois"
NAV_SESSION = "tinynav_nav"


def _default_workspace_dir() -> str:
Expand Down Expand Up @@ -64,12 +66,28 @@ class DoctorCommand:


@dataclass
class NavCommand:
"""Run a navigation task."""
class NavStatusCommand:
"""Show the current navigation workflow status."""

container_name: str = DEFAULT_CONTAINER_NAME


@dataclass
class NavStartCommand:
"""Start the navigation workflow."""

map_name: Annotated[str, tyro.conf.arg(name="map-name")]
container_name: str = DEFAULT_CONTAINER_NAME


@dataclass
class NavGoCommand:
"""Publish POIs for navigation."""

pois: str | None = None
container_name: str = DEFAULT_CONTAINER_NAME


@dataclass
class ExampleCommand:
"""Run the rosbag example workflow inside the tinynav container."""
Expand Down Expand Up @@ -142,6 +160,19 @@ class SensorsCommand:
MapList = Annotated[MapListCommand, tyro.conf.subcommand(name="list")]
MapCommand = Union[MapStatus, MapStartRecord, MapStopRecord, MapBuild, MapEditPois, MapList]

@dataclass
class NavStopCommand:
"""Stop the navigation workflow."""

container_name: str = DEFAULT_CONTAINER_NAME


NavStatus = Annotated[NavStatusCommand, tyro.conf.subcommand(name="status")]
NavStart = Annotated[NavStartCommand, tyro.conf.subcommand(name="start")]
NavGo = Annotated[NavGoCommand, tyro.conf.subcommand(name="go")]
NavStop = Annotated[NavStopCommand, tyro.conf.subcommand(name="stop")]
NavCommand = Union[NavStatus, NavStart, NavGo, NavStop]

Init = Annotated[InitCommand, tyro.conf.subcommand(name="init")]
Doctor = Annotated[DoctorCommand, tyro.conf.subcommand(name="doctor")]
Nav = Annotated[NavCommand, tyro.conf.subcommand(name="nav")]
Expand Down Expand Up @@ -752,6 +783,40 @@ def _ros2_node_names(container_name: str) -> list[str]:
return [line.strip() for line in result.stdout.splitlines() if line.strip()]


NAV_REQUIRED_NODES = {
"/perception_node": "perception_node.py",
"/planning_node": "planning_node.py",
"/map_node": "map_node.py",
"/cmd_vel_control_node": "cmd_vel_control.py",
}


def _tmux_session_exists(container_name: str, session_name: str) -> bool:
result = _docker_exec_output(container_name, f"tmux has-session -t {session_name}")
return result.returncode == 0


def _nav_status(container_name: str) -> tuple[str, list[str]]:
if not _tmux_session_exists(container_name, NAV_SESSION):
return "idle", []
nodes = set(_ros2_node_names(container_name))
missing = [node for node in NAV_REQUIRED_NODES if node not in nodes]
if missing:
return "starting", missing
return "running", []


def _nav_map_name(container_name: str) -> str | None:
result = _docker_exec_output(container_name, f"tmux show-environment -t {NAV_SESSION} TINYNAV_MAP_NAME")
if result.returncode != 0:
return None
line = (result.stdout or '').strip()
prefix = 'TINYNAV_MAP_NAME='
if not line.startswith(prefix):
return None
return line[len(prefix):]


def _map_status(container_name: str) -> str:
nodes = _ros2_node_names(container_name)
if "/build_map_node" in nodes:
Expand Down Expand Up @@ -801,6 +866,31 @@ def _container_maps_dir() -> Path:
return Path(CONTAINER_WORKSPACE_DIR) / "maps"


def _parse_poi_selection(pois: str) -> list[str]:
values = [value.strip() for value in pois.split(",") if value.strip()]
if not values:
raise ValueError("--pois must be a comma-separated list like 2,1,0")
return values


def _selected_cmd_pois(map_path: Path, pois: str | None) -> dict[str, object]:
pois_path = map_path / "pois.json"
if not pois_path.exists():
raise FileNotFoundError(f"POI file not found: {pois_path}")
with pois_path.open() as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError("pois.json must be a JSON object")
if pois is None:
return data
selected = {}
for index, poi_key in enumerate(_parse_poi_selection(pois)):
if poi_key not in data:
raise KeyError(f"POI {poi_key} not found in {pois_path}")
selected[str(index)] = data[poi_key]
return selected


def _container_rosbags_dir() -> Path:
return Path(CONTAINER_WORKSPACE_DIR) / "rosbags"

Expand Down Expand Up @@ -852,10 +942,108 @@ def run_version(command: VersionCommand) -> int:
return 0


def run_nav(command: NavCommand) -> int:
def run_nav_status(command: NavStatusCommand) -> int:
if not _ensure_runtime_container(command.container_name):
return 1
status, missing = _nav_status(command.container_name)
print(f"tinynav nav status: {status}")
if missing:
print(f" 👉 missing nodes: {', '.join(missing)}")
return 0


def run_nav_start(command: NavStartCommand) -> int:
if not _ensure_runtime_container(command.container_name):
return 1
status, _ = _nav_status(command.container_name)
if status != "idle":
print(f"❌ nav start is only allowed in idle state")
print(f" 👉 current state: {status}")
return 1
container_map_path = _container_maps_dir() / command.map_name
result = _docker_exec_output(
command.container_name,
" && ".join([
f"test -d {container_map_path}",
f"tmux kill-session -t {NAV_SESSION} >/dev/null 2>&1 || true",
f"tmux new-session -d -s {NAV_SESSION}",
f"tmux set-environment -t {NAV_SESSION} TINYNAV_MAP_NAME {command.map_name}",
f"tmux split-window -t {NAV_SESSION} -h",
f"tmux split-window -t {NAV_SESSION}:0.0 -v",
f"tmux split-window -t {NAV_SESSION}:0.1 -v",
f"tmux send-keys -t {NAV_SESSION}:0.0 'source /opt/ros/*/setup.bash >/dev/null 2>&1 && uv run python /tinynav/tinynav/core/perception_node.py' C-m",
f"tmux send-keys -t {NAV_SESSION}:0.1 'source /opt/ros/*/setup.bash >/dev/null 2>&1 && uv run python /tinynav/tinynav/core/planning_node.py' C-m",
f"tmux send-keys -t {NAV_SESSION}:0.2 'source /opt/ros/*/setup.bash >/dev/null 2>&1 && uv run python /tinynav/tinynav/platforms/cmd_vel_control.py' C-m",
f"tmux send-keys -t {NAV_SESSION}:0.3 'source /opt/ros/*/setup.bash >/dev/null 2>&1 && uv run python /tinynav/tinynav/core/map_node.py --tinynav_map_path {container_map_path}' C-m",
]),
)
if result.returncode != 0:
print("❌ Failed to start navigation inside the container.")
if result.stderr or result.stdout:
print(f" 👉 {(result.stderr or result.stdout).strip()}")
return 1
print(f"✅ Started navigation inside container {command.container_name}.")
print(f" 👉 tmux session: {NAV_SESSION}")
print(f" 👉 map: {command.map_name}")
return 0


def run_nav_go(command: NavGoCommand) -> int:
if not _ensure_runtime_container(command.container_name):
return 1
status, _ = _nav_status(command.container_name)
if status != "running":
print("❌ nav go is only allowed in running state")
print(f" 👉 current state: {status}")
return 1
map_name = _nav_map_name(command.container_name)
if map_name is None:
print("❌ Failed to resolve map name from the running nav session.")
return 1
map_path = _maps_dir() / map_name
try:
payload = _selected_cmd_pois(map_path, command.pois)
except (FileNotFoundError, ValueError, KeyError) as exc:
print("❌ Failed to prepare navigation POIs.")
print(f" 👉 {exc}")
return 1
payload_json = json.dumps(payload, separators=(",", ":"))
msg_arg = shlex.quote(f"{{data: {payload_json}}}")
result = _docker_exec_output(
command.container_name,
"source /opt/ros/*/setup.bash >/dev/null 2>&1 && "
f"ros2 topic pub --once /mapping/cmd_pois std_msgs/msg/String {msg_arg}",
)
if result.returncode != 0:
print("❌ Failed to publish navigation POIs inside the container.")
if result.stderr or result.stdout:
print(f" 👉 {(result.stderr or result.stdout).strip()}")
return 1
print(f"✅ Published navigation POIs inside container {command.container_name}.")
print(f" 👉 map: {map_name}")
if command.pois is None:
print(" 👉 pois: all")
else:
print(f" 👉 pois: {command.pois}")
return 0


def run_nav_stop(command: NavStopCommand) -> int:
if not _ensure_runtime_container(command.container_name):
return 1
print("tinynav nav: not implemented yet")
if not _tmux_session_exists(command.container_name, NAV_SESSION):
print("✅ Navigation is not running.")
return 0
for pane in ("0.0", "0.1", "0.2", "0.3"):
_docker_exec_output(command.container_name, f"tmux send-keys -t {NAV_SESSION}:{pane} C-c")
time.sleep(1.0)
result = _docker_exec_output(command.container_name, f"tmux kill-session -t {NAV_SESSION}")
if result.returncode != 0:
print("❌ Failed to stop navigation inside the container.")
if result.stderr or result.stdout:
print(f" 👉 {(result.stderr or result.stdout).strip()}")
return 1
print(f"✅ Stopped navigation inside container {command.container_name}.")
return 0


Expand Down Expand Up @@ -1158,8 +1346,14 @@ def run(command: Command) -> int:
return run_init(command)
case DoctorCommand():
return run_doctor(command)
case NavCommand():
return run_nav(command)
case NavStatusCommand():
return run_nav_status(command)
case NavStartCommand():
return run_nav_start(command)
case NavGoCommand():
return run_nav_go(command)
case NavStopCommand():
return run_nav_stop(command)
case ExampleCommand():
return run_example(command)
case VersionCommand():
Expand Down
41 changes: 41 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,46 @@
import json

import pytest

from tinynav_cli.cli import _parse_poi_selection, _selected_cmd_pois
from tinynav_cli.version import __version__


def test_version() -> None:
assert __version__ == "0.0.13"


def test_parse_poi_selection() -> None:
assert _parse_poi_selection("2,1,0") == ["2", "1", "0"]


def test_parse_poi_selection_rejects_empty() -> None:
with pytest.raises(ValueError):
_parse_poi_selection("")


def test_selected_cmd_pois_preserves_inner_ids(tmp_path) -> None:
map_path = tmp_path / "maps" / "demo"
map_path.mkdir(parents=True)
pois = {
"0": {"id": 0, "name": "POI_0", "position": [0, 0, 0]},
"1": {"id": 1, "name": "POI_1", "position": [1, 1, 1]},
"2": {"id": 2, "name": "POI_2", "position": [2, 2, 2]},
}
(map_path / "pois.json").write_text(json.dumps(pois))

selected = _selected_cmd_pois(map_path, "2,1,0")

assert list(selected.keys()) == ["0", "1", "2"]
assert selected["0"]["id"] == 2
assert selected["1"]["id"] == 1
assert selected["2"]["id"] == 0


def test_selected_cmd_pois_without_filter_returns_all(tmp_path) -> None:
map_path = tmp_path / "maps" / "demo"
map_path.mkdir(parents=True)
pois = {"0": {"id": 0}}
(map_path / "pois.json").write_text(json.dumps(pois))

assert _selected_cmd_pois(map_path, None) == pois
11 changes: 11 additions & 0 deletions tests/test_cli_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,14 @@ def test_sensors_help_runs() -> None:
text = output_text(result).lower()
assert "container" in text
assert "preview" in text



def test_nav_help_runs() -> None:
result = run_cli("nav", "--help")
assert result.returncode == 0
text = output_text(result)
assert "status" in text
assert "start" in text
assert "go" in text
assert "stop" in text