Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions gr00t_wbc/control/envs/g1/utils/command_sender.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import time
from typing import Dict

import numpy as np
from unitree_sdk2py.core.channel import ChannelPublisher
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__HandCmd_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import HandCmd_
from unitree_sdk2py.utils.crc import CRC
Expand All @@ -21,7 +22,7 @@ def __init__(self, config: Dict):
or self.config["ROBOT_TYPE"] == "h1-2_27dof"
):
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_

self.low_cmd = unitree_hg_msg_dds__LowCmd_()
else:
Expand All @@ -44,8 +45,22 @@ def __init__(self, config: Dict):
# init low cmd publisher
self.lowcmd_publisher_ = ChannelPublisher("rt/lowcmd", LowCmd_)
self.lowcmd_publisher_.Init()
self.InitLowCmd()
self.low_state = None

# wait for first lowstate to set mode_machine (g1/h1-2 robots)
if (
self.config["ROBOT_TYPE"] == "g1_29dof"
or self.config["ROBOT_TYPE"] == "h1-2_21dof"
or self.config["ROBOT_TYPE"] == "h1-2_27dof"
):
self._lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
self._lowstate_subscriber.Init(self._LowStateHandler, 10)
print("Waiting for first lowstate to set mode_machine...")
while self.low_state is None:
time.sleep(0.1)
print(f"mode_machine set to {self.low_state.mode_machine}")

self.InitLowCmd()
self.crc = CRC()

def InitLowCmd(self):
Expand Down Expand Up @@ -73,14 +88,17 @@ def InitLowCmd(self):
or self.config["ROBOT_TYPE"] == "h1-2_21dof"
or self.config["ROBOT_TYPE"] == "h1-2_27dof"
):
self.low_cmd.mode_machine = self.config["UNITREE_LEGGED_CONST"]["MODE_MACHINE"]
self.low_cmd.mode_machine = self.low_state.mode_machine
self.low_cmd.mode_pr = self.config["UNITREE_LEGGED_CONST"]["MODE_PR"]
else:
pass

def is_weak_motor(self, motor_index: int) -> bool:
return motor_index in self.weak_motor_joint_index

def _LowStateHandler(self, msg):
self.low_state = msg

def send_command(self, cmd_q: np.ndarray, cmd_dq: np.ndarray, cmd_tau: np.ndarray):
for i in range(self.config["NUM_MOTORS"]):
motor_index = self.config["JOINT2MOTOR"][i]
Expand Down