Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 297 additions & 5 deletions examples/mcap_record_replay/python/common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Shared pipeline for the record / replay scripts.
"""Shared pipeline and visualization helpers for the record / replay / live scripts.

Includes hand, controller, and full-body pipeline builders plus the rendering
helpers used by the replay scripts (``HandJoints``, ``HAND_BONES``,
``BODY_BONES``).
Pipeline builders: ``build_hand_pipeline``, ``build_controller_pipeline``,
``build_full_body_pipeline``.

Viz classes (used by replay_* and live_* scripts): ``HandViz``,
``ControllerViz``, ``FullBodyViz``.

Rendering helpers: ``HandJoints``, ``HAND_BONES``, ``BODY_BONES``.
"""

import numpy as np
import viser

from isaacteleop.retargeting_engine.deviceio_source_nodes import (
ControllersSource,
Expand All @@ -33,7 +38,10 @@
HandInput,
HandInputIndex,
)
from isaacteleop.retargeting_engine.tensor_types.indices import BodyJointPicoIndex
from isaacteleop.retargeting_engine.tensor_types.indices import (
BodyJointPicoIndex,
ControllerInputIndex,
)
from isaacteleop.retargeting_engine.tensor_types.ndarray_types import (
DLDataType,
NDArrayType,
Expand All @@ -46,6 +54,15 @@
HANDS_CHANNEL = "hands"
BODY_JOINT_NAMES = [joint.name for joint in BodyJointPicoIndex]

# ---------------------------------------------------------------------------
# Color palette shared across all viz scripts
# ---------------------------------------------------------------------------

LEFT_COLOR: tuple[float, float, float] = (0.25, 0.85, 0.35)
RIGHT_COLOR: tuple[float, float, float] = (0.35, 0.55, 0.95)
INVALID_COLOR: tuple[float, float, float] = (1.0, 0.0, 0.0)
TRACKED_COLOR: tuple[float, float, float] = (0.25, 0.85, 0.35)


def _positions_group(name: str) -> TensorGroupType:
return TensorGroupType(
Expand Down Expand Up @@ -202,3 +219,278 @@ def build_full_body_pipeline():
(23, 24),
(24, 25),
)


# ---------------------------------------------------------------------------
# Shared viser visualization classes
# ---------------------------------------------------------------------------


def _bone_segments(positions: np.ndarray) -> np.ndarray:
"""Return (N, 2, 3) segment array for the parent→child hand bones."""
return np.stack(
[np.stack([positions[a], positions[b]], axis=0) for a, b in HAND_BONES],
axis=0,
).astype(np.float32)


def _valid_bone_segments(positions: np.ndarray, valid: np.ndarray) -> np.ndarray:
"""Return (N, 2, 3) segment array for body bones whose both endpoints are valid."""
segments: list[np.ndarray] = []
for a, b in BODY_BONES:
if valid[a] and valid[b]:
segments.append(np.stack([positions[a], positions[b]], axis=0))
if not segments:
return np.zeros((0, 2, 3), dtype=np.float32)
return np.stack(segments, axis=0).astype(np.float32)


def _segment(start: np.ndarray, end: np.ndarray) -> np.ndarray:
return np.stack([start, end], axis=0).astype(np.float32)


def controller_state(controller) -> dict:
"""Extract a plain-dict snapshot from a controller TensorGroup."""
if controller.is_none:
return {
"aim_pos": None,
"grip_pos": None,
"aim_valid": False,
"grip_valid": False,
"trigger": 0.0,
"squeeze": 0.0,
"thumbstick_xy": (0.0, 0.0),
"primary_click": False,
"secondary_click": False,
"thumbstick_click": False,
"menu_click": False,
"tracked": False,
}

aim_valid = bool(controller[ControllerInputIndex.AIM_IS_VALID])
grip_valid = bool(controller[ControllerInputIndex.GRIP_IS_VALID])
return {
"aim_pos": np.asarray(
controller[ControllerInputIndex.AIM_POSITION], dtype=np.float32
),
"grip_pos": np.asarray(
controller[ControllerInputIndex.GRIP_POSITION], dtype=np.float32
),
"aim_valid": aim_valid,
"grip_valid": grip_valid,
"trigger": float(controller[ControllerInputIndex.TRIGGER_VALUE]),
"squeeze": float(controller[ControllerInputIndex.SQUEEZE_VALUE]),
"thumbstick_xy": (
float(controller[ControllerInputIndex.THUMBSTICK_X]),
float(controller[ControllerInputIndex.THUMBSTICK_Y]),
),
"primary_click": float(controller[ControllerInputIndex.PRIMARY_CLICK]) > 0.5,
"secondary_click": float(controller[ControllerInputIndex.SECONDARY_CLICK])
> 0.5,
"thumbstick_click": float(controller[ControllerInputIndex.THUMBSTICK_CLICK])
> 0.5,
"menu_click": float(controller[ControllerInputIndex.MENU_CLICK]) > 0.5,
"tracked": aim_valid or grip_valid,
}


class HandViz:
"""Per-hand viser handles (joint cloud + skeleton segments)."""

def __init__(
self,
server: viser.ViserServer,
name: str,
color: tuple[float, float, float],
):
self.color = np.array(color, dtype=np.float32)
zero_pts = np.zeros((26, 3), dtype=np.float32)
zero_segs = np.zeros((len(HAND_BONES), 2, 3), dtype=np.float32)

self.points = server.scene.add_point_cloud(
name=f"/{name}/joints",
points=zero_pts,
colors=np.tile(self.color, (26, 1)),
point_size=0.008,
)
self.bones = server.scene.add_line_segments(
name=f"/{name}/bones",
points=zero_segs,
colors=np.tile(self.color, (len(HAND_BONES), 2, 1)),
line_width=2.0,
)

def update(self, positions: np.ndarray, valid: bool) -> None:
if valid:
self.points.points = positions.astype(np.float32)
self.points.colors = np.tile(self.color, (positions.shape[0], 1))
self.bones.points = _bone_segments(positions)
else:
zero_pts = np.zeros_like(positions, dtype=np.float32)
self.points.points = zero_pts
self.points.colors = np.tile(INVALID_COLOR, (positions.shape[0], 1))
self.bones.points = np.zeros((len(HAND_BONES), 2, 3), dtype=np.float32)


class ControllerViz:
"""Per-controller viser handles (3D pose + live input-state HUD)."""

def __init__(
self,
server: viser.ViserServer,
name: str,
color: tuple[float, float, float],
):
self.color = np.array(color, dtype=np.float32)
zero_pt = np.zeros((1, 3), dtype=np.float32)
zero_seg = np.zeros((0, 2, 3), dtype=np.float32)
zero_seg_colors = np.zeros((0, 2, 3), dtype=np.float32)

self.aim = server.scene.add_point_cloud(
name=f"/{name}/aim",
points=zero_pt,
colors=np.tile(self.color, (1, 1)),
point_size=0.015,
)
self.grip = server.scene.add_point_cloud(
name=f"/{name}/grip",
points=zero_pt,
colors=np.tile(self.color, (1, 1)),
point_size=0.015,
)
self.ray = server.scene.add_line_segments(
name=f"/{name}/ray",
points=zero_seg,
colors=zero_seg_colors,
line_width=2.0,
)

with server.gui.add_folder(name):
self.hud_tracking = server.gui.add_checkbox("tracked", False, disabled=True)
self.hud_aim_valid = server.gui.add_checkbox(
"aim_valid", False, disabled=True
)
self.hud_grip_valid = server.gui.add_checkbox(
"grip_valid", False, disabled=True
)
self.hud_stick = server.gui.add_vector2(
"thumbstick_xy",
initial_value=(0.0, 0.0),
min=(-1.0, -1.0),
max=(1.0, 1.0),
disabled=True,
)
self.hud_trigger_value = server.gui.add_number(
"trigger",
initial_value=0.0,
min=0.0,
max=1.0,
step=0.01,
disabled=True,
)
self.hud_trigger = server.gui.add_progress_bar(0.0)
self.hud_squeeze_value = server.gui.add_number(
"squeeze",
initial_value=0.0,
min=0.0,
max=1.0,
step=0.01,
disabled=True,
)
self.hud_squeeze = server.gui.add_progress_bar(0.0)
self.hud_primary = server.gui.add_checkbox(
"primary_click", False, disabled=True
)
self.hud_secondary = server.gui.add_checkbox(
"secondary_click", False, disabled=True
)
self.hud_stick_click = server.gui.add_checkbox(
"thumbstick_click", False, disabled=True
)
self.hud_menu_click = server.gui.add_checkbox(
"menu_click", False, disabled=True
)

def update(self, state: dict) -> None:
aim_valid: bool = state["aim_valid"]
grip_valid: bool = state["grip_valid"]
aim_pos: np.ndarray | None = state["aim_pos"]
grip_pos: np.ndarray | None = state["grip_pos"]

self.hud_tracking.value = state["tracked"]
self.hud_aim_valid.value = aim_valid
self.hud_grip_valid.value = grip_valid
self.hud_stick.value = state["thumbstick_xy"]
self.hud_trigger.value = max(0.0, min(1.0, state["trigger"]))
self.hud_trigger_value.value = state["trigger"]
self.hud_squeeze.value = max(0.0, min(1.0, state["squeeze"]))
self.hud_squeeze_value.value = state["squeeze"]
self.hud_primary.value = state["primary_click"]
self.hud_secondary.value = state["secondary_click"]
self.hud_stick_click.value = state["thumbstick_click"]
self.hud_menu_click.value = state["menu_click"]

if aim_valid and aim_pos is not None:
self.aim.points = aim_pos.reshape(1, 3).astype(np.float32)
self.aim.colors = np.tile(self.color, (1, 1))
else:
self.aim.points = np.zeros((1, 3), dtype=np.float32)
self.aim.colors = np.tile(INVALID_COLOR, (1, 1))

if grip_valid and grip_pos is not None:
self.grip.points = grip_pos.reshape(1, 3).astype(np.float32)
self.grip.colors = np.tile(self.color, (1, 1))
else:
self.grip.points = np.zeros((1, 3), dtype=np.float32)
self.grip.colors = np.tile(INVALID_COLOR, (1, 1))

if aim_valid and grip_valid and aim_pos is not None and grip_pos is not None:
seg = _segment(grip_pos, aim_pos).reshape(1, 2, 3)
self.ray.points = seg
self.ray.colors = np.tile(self.color, (1, 2, 1))
else:
self.ray.points = np.zeros((0, 2, 3), dtype=np.float32)
self.ray.colors = np.zeros((0, 2, 3), dtype=np.float32)


class FullBodyViz:
"""Viser handles for full-body skeleton (joint cloud + skeleton segments)."""

def __init__(self, server: viser.ViserServer):
self.color = np.array(TRACKED_COLOR, dtype=np.float32)
zero_pts = np.zeros((len(BODY_JOINT_NAMES), 3), dtype=np.float32)
zero_segs = np.zeros((0, 2, 3), dtype=np.float32)

self.points = server.scene.add_point_cloud(
name="/full_body/joints",
points=zero_pts,
colors=np.tile(self.color, (len(BODY_JOINT_NAMES), 1)),
point_size=0.01,
)
self.bones = server.scene.add_line_segments(
name="/full_body/bones",
points=zero_segs,
colors=np.zeros((0, 2, 3), dtype=np.float32),
line_width=2.0,
)

def update(self, positions: np.ndarray | None, valid: np.ndarray | None) -> None:
if positions is None or valid is None:
zero_pts = np.zeros((len(BODY_JOINT_NAMES), 3), dtype=np.float32)
self.points.points = zero_pts
self.points.colors = np.tile(INVALID_COLOR, (len(BODY_JOINT_NAMES), 1))
self.bones.points = np.zeros((0, 2, 3), dtype=np.float32)
self.bones.colors = np.zeros((0, 2, 3), dtype=np.float32)
return

positions = positions.astype(np.float32)
valid_bool = valid.astype(bool)
self.points.points = positions

point_colors = np.tile(self.color, (positions.shape[0], 1))
point_colors[~valid_bool] = INVALID_COLOR
self.points.colors = point_colors

segs = _valid_bone_segments(positions, valid_bool)
self.bones.points = segs
self.bones.colors = np.tile(self.color, (segs.shape[0], 2, 1))
27 changes: 27 additions & 0 deletions examples/mcap_record_replay/python/default.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# CloudXR env-var overrides for the mcap_record_replay example scripts.
#
# Passed to CloudXRLauncher via the --env-file argument. Format: one
# KEY=value per line; # comments and blank lines are ignored; values are
# expanded for $VARS and ~ (via os.path.expandvars / os.path.expanduser).
#
# Precedence (highest first):
# 1. This file
# 2. Process environment variables (e.g. Docker / shell exports)
# 3. Built-in defaults in isaacteleop/cloudxr/env_config.py
#
# Reserved keys — always computed by the runtime, silently ignored if set here:
# XR_RUNTIME_JSON, XRT_NO_STDIN, NV_CXR_RUNTIME_DIR, NV_CXR_OUTPUT_DIR

# Device profile advertised by the CloudXR runtime.
# "auto-webrtc" (built-in default) serves WebXR / browser-based clients over
# WebRTC. Override to "Quest3" when connecting a Meta Quest 3 headset directly.
NV_DEVICE_PROFILE=Quest3

# Input device discovery channels. Both are true by default; pinned here for
# visibility. Leave enabled for XR controller and hand-tracking data.
NV_CXR_ENABLE_PUSH_DEVICES=true
NV_CXR_ENABLE_TENSOR_DATA=true

# Write CloudXR runtime logs to ~/.cloudxr/logs. Useful when diagnosing
# connection issues (e.g. "Failed to get OpenXR system: -35"). true by default.
NV_CXR_FILE_LOGGING=true
Loading
Loading