@@ -27,33 +27,20 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
27
27
self .max_steps = 40_000_000
28
28
self .num_envs = num_envs
29
29
self .num_single_obs = obs_cfg ["num_single_obs" ]
30
- self .single_num_privileged_obs = obs_cfg ["single_num_privileged_obs" ]
31
30
self .num_actions = env_cfg ["num_actions" ]
32
31
self .num_commands = command_cfg ["num_commands" ]
33
32
34
- self .add_noise = obs_cfg .get ("add_noise" , False )
35
-
36
33
# observation history
37
34
self .frame_stack = obs_cfg .get ("frame_stack" , 1 )
38
35
self .num_obs = self .num_single_obs * self .frame_stack
39
36
self .c_frame_stack = obs_cfg .get ("c_frame_stack" , 1 )
40
37
self .obs_history = collections .deque (maxlen = self .frame_stack )
41
38
self .critic_history = collections .deque (maxlen = self .c_frame_stack )
42
39
43
- # privileged observation config
44
- self .num_privileged_obs = self .single_num_privileged_obs * self .c_frame_stack
45
- print (f"Number of privileged observations: { self .num_privileged_obs } " )
46
-
47
40
for _ in range (self .frame_stack ):
48
41
self .obs_history .append (
49
42
torch .zeros (self .num_envs , self .num_single_obs , dtype = torch .float , device = self .device )
50
43
)
51
- for _ in range (self .c_frame_stack ):
52
- self .critic_history .append (
53
- torch .zeros (
54
- self .num_envs , self .single_num_privileged_obs , dtype = torch .float , device = self .device
55
- )
56
- )
57
44
58
45
self .simulate_action_latency = True # there is a 1 step latency on real robot
59
46
self .dt = 0.02 # control frequence on real robot is 50Hz
@@ -103,11 +90,6 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
103
90
collision = True
104
91
))
105
92
106
- # terrain measurement
107
- self .measure_heights = False
108
- self .measured_heights = torch .zeros ((self .num_envs , 1 ), device = self .device ) # 初始化为全零张量
109
- self .height_samples = 64
110
-
111
93
# add robot
112
94
self .base_init_pos = torch .tensor (self .env_cfg ["base_init_pos" ], device = self .device )
113
95
self .base_init_quat = torch .tensor (self .env_cfg ["base_init_quat" ], device = self .device )
@@ -121,13 +103,10 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
121
103
)
122
104
123
105
# build
124
- print (f'num_envs: { num_envs } ' )
125
106
self .scene .build (n_envs = num_envs , env_spacing = (1.0 ,1.0 ))
126
107
127
- print (f'{ self .env_cfg ["dof_names" ]} ' )
128
108
# names to indices
129
109
self .motor_dofs = [self .robot .get_joint (name ).dof_idx_local for name in self .env_cfg ["dof_names" ]]
130
- print (f"Motor dofs: { self .motor_dofs } " ) # Motor dofs: [6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
131
110
# Initialize legs_joints mapping with bounds checking
132
111
self .legs_joints = {}
133
112
joint_names = ["left_hip_pitch" , "left_knee_pitch" , "left_ankle_pitch" ,
@@ -139,34 +118,12 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
139
118
else :
140
119
print (f"Warning: Joint { name } not found in motor_dofs" )
141
120
142
- print (f"Legs joints: { self .legs_joints } " )
143
- #TODO: Initialize legs_joints mapping correctly
144
-
145
- # Initialize feet indices
146
- self .feet_indices = [
147
- self .robot .get_link ("foot_left" ).idx - 6 ,
148
- self .robot .get_link ("foot_right" ).idx - 6 ,
149
- ]
150
- print (f"Feet indices: { self .feet_indices } " )
151
121
# Get number of bodies in the robot
152
122
self .num_bodies = self .robot .n_links
153
- print (f"Number of bodies in the robot: { self .num_bodies } " )
154
- print (f"Robot links: { self .robot .links } " )
155
- for i ,e in enumerate (self .robot .links ):
156
- # print(f"Body {i} name: {e}")
157
- print (f"Body { i } name: { e .name } " )
158
- # Initialize observation related buffers
159
- self .contact_forces = torch .zeros ((self .num_envs , len (self .feet_indices ), 3 ), device = self .device )
160
-
161
- # print(f"Contact forces length: {len(self.contact_forces)}")
162
- # print(f"First element type: {type(self.contact_forces[0]) if len(self.contact_forces) > 0 else None}")
163
- # print(f"First element length: {len(self.contact_forces[0]) if len(self.contact_forces) > 0 else None}")
164
-
165
- self .net_contact_forces = torch .zeros ((self .num_envs , self .num_bodies , 3 ), device = self .device )
123
+
166
124
self .rand_push_force = torch .zeros ((self .num_envs , 3 ), device = self .device )
167
125
self .rand_push_torque = torch .zeros ((self .num_envs , 3 ), device = self .device )
168
126
self .env_frictions = torch .ones ((self .num_envs ,), device = self .device )
169
- self .body_mass = torch .ones ((self .num_envs ,), device = self .device ) * 30.0
170
127
171
128
# PD control parameters
172
129
self .robot .set_dofs_kp ([self .env_cfg ["kp" ]] * self .num_actions , self .motor_dofs )
@@ -188,8 +145,6 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
188
145
)
189
146
# observation buffers
190
147
self .obs_buf = torch .zeros ((self .num_envs , self .num_obs ), device = self .device , dtype = gs .tc_float )
191
- if self .num_privileged_obs is not None :
192
- self .privileged_obs_buf = torch .zeros ((self .num_envs , self .num_privileged_obs ), device = self .device , dtype = gs .tc_float )
193
148
194
149
self .rew_buf = torch .zeros ((self .num_envs ,), device = self .device , dtype = gs .tc_float )
195
150
self .reset_buf = torch .ones ((self .num_envs ,), device = self .device , dtype = gs .tc_int )
@@ -218,20 +173,15 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
218
173
"observations" : {},
219
174
} # extra information for logging
220
175
221
- self .noise_scale_vec = self ._get_noise_scale_vec (obs_cfg )
222
-
223
176
# Initialize missing variables
224
177
self .default_joint_pd_target = self .default_dof_pos .clone ()
225
178
self .base_euler = torch .zeros ((self .num_envs , 3 ), device = self .device )
226
- self .filtered_base_height = torch .zeros ((self .num_envs , 1 ), device = self .device ) # 初始化为二维张量
227
179
self .filtered_base_height = torch .zeros (self .num_envs , device = self .device )
228
180
# Initialize terrain difficulty
229
181
self .terrain_difficulty = torch .zeros (self .num_envs , device = self .device )
230
182
self .difficulty_factors = {
231
183
"random_uniform_terrain" : 0.3 ,
232
184
}
233
- self .root_states = torch .zeros ((self .num_envs , 13 ), device = self .device )
234
- self .rigid_state = torch .zeros ((self .num_envs , self .num_bodies , 13 ), device = self .device )
235
185
236
186
def _resample_commands (self , envs_idx ):
237
187
self .commands [envs_idx , 0 ] = gs_rand_float (* self .command_cfg ["lin_vel_x_range" ], (len (envs_idx ),), self .device )
@@ -261,13 +211,6 @@ def step(self, actions):
261
211
self .dof_pos [:] = self .robot .get_dofs_position (self .motor_dofs )
262
212
self .dof_vel [:] = self .robot .get_dofs_velocity (self .motor_dofs )
263
213
264
- # Get net contact forces for all bodies from rigid solver
265
- # contact_forces = self.robot.get_links_net_contact_force()
266
- # if contact_forces is not None:
267
- # for env_idx in range(self.num_envs):
268
- # if env_idx < contact_forces.shape[0]:
269
- # self.net_contact_forces[env_idx] = contact_forces[env_idx][:self.num_bodies].clone()
270
-
271
214
# resample commands
272
215
envs_idx = (
273
216
(self .episode_length_buf % int (self .env_cfg ["resampling_time_s" ] / self .dt ) == 0 )
@@ -305,42 +248,15 @@ def step(self, actions):
305
248
306
249
# obs, rewards, dones, infos
307
250
return self .obs_buf , self .rew_buf , self .reset_buf , self .extras
308
- # {
309
- # "observations": {
310
- # "critic": self.privileged_obs_buf
311
- # },
312
- # **self.extras
313
- # }
314
-
315
251
316
252
def get_observations (self ):
317
253
return self .obs_buf , self .extras
318
- # , {
319
- # "observations": {
320
- # "critic": self.privileged_obs_buf
321
- # },
322
- # **self.extras
323
- # }
324
254
325
255
def _get_phase (self ):
326
256
cycle_time = self .env_cfg .get ("cycle_time" , 1.0 )
327
257
phase = self .episode_length_buf * self .dt / cycle_time
328
258
return phase
329
259
330
- def _get_gait_phase (self ):
331
- # return float mask 1 is stance, 0 is swing
332
- phase = self ._get_phase ()
333
- sin_pos = torch .sin (2 * torch .pi * phase )
334
- # Add double support phase
335
- stance_mask = torch .zeros ((self .num_envs , 2 ), device = self .device )
336
- # left foot stance
337
- stance_mask [:, 0 ] = sin_pos >= 0
338
- # right foot stance
339
- stance_mask [:, 1 ] = sin_pos < 0
340
- # Double support phase
341
- stance_mask [torch .abs (sin_pos ) < 0.1 ] = 1
342
- return stance_mask
343
-
344
260
def compute_ref_state (self ):
345
261
phase = self ._get_phase ()
346
262
sin_pos = torch .sin (2 * torch .pi * phase )
@@ -377,70 +293,17 @@ def safe_update(joint_name, scale):
377
293
# Double support phase
378
294
self .ref_dof_pos [torch .abs (sin_pos ) < 0.1 ] = 0
379
295
380
- self .ref_action = 2 * self .ref_dof_pos
381
-
382
- def _get_noise_scale_vec (self , cfg ):
383
- """Sets a vector used to scale the noise added to the observations.
384
- [NOTE]: Must be adapted when changing the observations structure
385
-
386
- Args:
387
- cfg (Dict): Environment config file
388
-
389
- Returns:
390
- [torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1]
391
- """
392
- num_actions = self .num_actions
393
- noise_vec = torch .zeros (self .num_single_obs , device = self .device )
394
- self .add_noise = cfg .get ("add_noise" , False )
395
- noise_scales = cfg ["noise_scales" ]
396
- noise_vec [0 :5 ] = 0.0 # commands
397
- noise_vec [5 : (num_actions + 5 )] = noise_scales ["dof_pos" ] * self .obs_scales ["dof_pos" ]
398
- noise_vec [(num_actions + 5 ) : (2 * num_actions + 5 )] = noise_scales ["dof_vel" ] * self .obs_scales ["dof_vel" ]
399
- noise_vec [(2 * num_actions + 5 ) : (3 * num_actions + 5 )] = 0.0 # previous actions
400
- noise_vec [(3 * num_actions + 5 ) : (3 * num_actions + 5 ) + 3 ] = (
401
- noise_scales ["ang_vel" ] * self .obs_scales ["ang_vel" ]
402
- ) # ang vel
403
- noise_vec [(3 * num_actions + 5 ) + 3 : (3 * num_actions + 5 )] = (
404
- noise_scales ["quat" ] * self .obs_scales ["quat" ]
405
- ) # euler x,y
406
- return noise_vec
407
-
408
296
def compute_observations (self ):
409
297
phase = self ._get_phase ()
410
298
self .compute_ref_state ()
411
299
412
300
sin_pos = torch .sin (2 * torch .pi * phase ).unsqueeze (1 )
413
301
cos_pos = torch .cos (2 * torch .pi * phase ).unsqueeze (1 )
414
302
415
- # stance_mask = self._get_gait_phase()
416
- # contact_mask = self.contact_forces[:, self.feet_indices, 2] > 5.0
417
-
418
303
self .command_input = torch .cat ((sin_pos , cos_pos , self .commands [:, :3 ] * self .commands_scale ), dim = 1 )
419
304
q = (self .dof_pos - self .default_dof_pos ) * self .obs_scales ["dof_pos" ]
420
305
dq = self .dof_vel * self .obs_scales ["dof_vel" ]
421
306
422
- # diff = self.dof_pos - self.ref_dof_pos
423
- # 单帧privileged观测值(不包含历史)
424
- # self.privileged_obs_buf = torch.cat(
425
- # (
426
- # self.command_input,
427
- # (self.dof_pos - self.default_joint_pd_target) * self.obs_scales['dof_pos'],
428
- # self.dof_vel * self.obs_scales['dof_vel'],
429
- # self.actions,
430
- # diff,
431
- # self.base_lin_vel * self.obs_scales['lin_vel'],
432
- # self.base_ang_vel * self.obs_scales['ang_vel'],
433
- # self.base_euler * self.obs_scales['quat'],
434
- # self.rand_push_force[:, :2],
435
- # self.rand_push_torque,
436
- # self.env_frictions.unsqueeze(1),
437
- # (self.body_mass / 30.0).unsqueeze(1),
438
- # stance_mask,
439
- # contact_mask,
440
- # ),
441
- # dim=-1
442
- # )
443
-
444
307
obs_buf = torch .cat ( # total 41 dim
445
308
(
446
309
self .command_input , # 5 = 2D(sin cos) + 3D(vel_x, vel_y, aug_vel_yaw)
@@ -449,48 +312,12 @@ def compute_observations(self):
449
312
self .actions , # 10D
450
313
self .base_ang_vel * self .obs_scales ["ang_vel" ], # 3
451
314
self .base_euler * self .obs_scales ["quat" ], # 3
452
- # (self.measured_heights if self.measured_heights is not None else torch.zeros((self.num_envs, 1), device=self.device))\
453
- # * self.obs_scales["height_measurements"], # 1 .mean(dim=1, keepdim=True)
454
- # self.terrain_difficulty.unsqueeze(1), # 1
455
315
),
456
316
dim = - 1 ,
457
317
)
458
318
459
- # if self.measure_heights:
460
- # heights = (
461
- # torch.clip(
462
- # self.root_states[:, 2].unsqueeze(1) - 0.5 - self.measured_heights,
463
- # -1,
464
- # 1.0,
465
- # )
466
- # * self.obs_scales["height_measurements"]
467
- # )
468
- # self.privileged_obs_buf = torch.cat((self.obs_buf, heights), dim=-1)
469
-
470
- # if self.add_noise:
471
- # noise_level = self.obs_cfg.get("noise_level", 0.1)
472
- # obs_now = obs_buf.clone() + torch.randn_like(obs_buf) * self.noise_scale_vec * noise_level
473
- # else:
474
- # obs_now = obs_buf.clone()
475
- # self.obs_history.append(obs_now)
476
- # self.critic_history.append(self.privileged_obs_buf)
477
-
478
- # obs_buf_all = torch.stack([self.obs_history[i] for i in range(self.obs_history.maxlen)], dim=1) # N,T,K
479
- # Ensure deque length is sufficient
480
- # if len(self.obs_history) < self.obs_history.maxlen:
481
- # obs_buf_all = torch.stack([self.obs_history[i] for i in range(len(self.obs_history))], dim=1) # N,T,K
482
- # else:
483
- # obs_buf_all = torch.stack([self.obs_history[i] for i in range(self.obs_history.maxlen)], dim=1) # N,T,K
484
-
485
- # self.obs_buf = obs_buf_all.reshape(self.num_envs, -1) # N, T*K
486
319
self .obs_buf = obs_buf
487
320
488
- # 直接使用当前帧privileged观测值
489
- # if len(self.critic_history) < self.c_frame_stack:
490
- # self.privileged_obs_buf = torch.cat([self.critic_history[i] for i in range(len(self.critic_history))], dim=1)
491
- # else:
492
- # self.privileged_obs_buf = torch.cat([self.critic_history[i] for i in range(self.c_frame_stack)], dim=1)
493
-
494
321
def reset_idx (self , envs_idx ):
495
322
if len (envs_idx ) == 0 :
496
323
return
@@ -646,95 +473,3 @@ def _reward_gait_symmetry(self):
646
473
knee_symmetry = torch .abs (left_knee - right_knee )
647
474
648
475
return torch .exp (- (hip_symmetry + knee_symmetry ))
649
-
650
- # def _reward_energy_efficiency(self):
651
- # # Reward energy efficiency by penalizing high joint velocities
652
- # return self.reward_cfg["reward_scales"]["dof_vel"] * torch.sum(torch.square(self.dof_vel), dim=1)
653
-
654
- # def _reward_orientation(self):
655
- # # Penalize base orientation away from upright
656
- # return torch.exp(-torch.abs(self.base_euler[:, 0]) - torch.abs(self.base_euler[:, 1])) * self.reward_cfg["reward_scales"]["orientation"]
657
-
658
- # def _reward_terrain_adaptation(self):
659
- # """Reward for adapting to different terrain types"""
660
- # # Calculate foot clearance
661
- # foot_clearance = torch.zeros((self.num_envs, 2), device=self.device)
662
- # for i, foot_idx in enumerate(self.feet_indices):
663
- # foot_pos = self.robot.get_link(id=foot_idx).get_pos()
664
- # # 添加地形坐标边界检查
665
- # x_coords = torch.clamp(foot_pos[:, 0].long().cpu(), 0, self.terrain.terrain_hf.shape[0]-1)
666
- # y_coords = torch.clamp(foot_pos[:, 1].long().cpu(), 0, self.terrain.terrain_hf.shape[1]-1)
667
- # terrain_height = torch.tensor(self.terrain.terrain_hf[x_coords, y_coords]).to(self.device)
668
- # foot_clearance[:, i] = foot_pos[:, 2] - terrain_height
669
-
670
- # # Reward for maintaining appropriate foot clearance
671
- # target_clearance = 0.05 # 5cm
672
- # clearance_error = torch.abs(foot_clearance - target_clearance)
673
- # return torch.exp(-torch.mean(clearance_error, dim=1) / 0.02)
674
-
675
- # def _reward_terrain_stability(self):
676
- # """Reward for maintaining stability on uneven terrain"""
677
- # # Penalize large base orientation changes
678
- # base_euler = quat_to_xyz(self.base_quat)
679
- # return torch.exp(-torch.abs(base_euler[:, 0]) - torch.abs(base_euler[:, 1]))
680
-
681
- # def _reward_terrain_progress(self):
682
- # """Reward for making progress across different terrain types"""
683
- # # Calculate forward progress relative to terrain difficulty
684
- # forward_vel = self.base_lin_vel[:, 0]
685
-
686
- # # Difficulty factors based on terrain type
687
- # difficulty_factors = {
688
- # "flat_terrain": 0.1,
689
- # "random_uniform_terrain": 0.3,
690
- # "stepping_stones_terrain": 0.5,
691
- # "pyramid_sloped_terrain": 0.7,
692
- # "discrete_obstacles_terrain": 0.8,
693
- # "wave_terrain": 0.6,
694
- # "pyramid_stairs_terrain": 0.9,
695
- # "sloped_terrain": 0.4
696
- # }
697
-
698
- # # Get current terrain type
699
- # terrain_type = self.terrain.get_type(self.base_pos[:, :2])
700
- # difficulty = difficulty_factors.get(terrain_type, 0.5)
701
-
702
- # return forward_vel / (difficulty + 0.1)
703
-
704
- # def _reward_joint_pos(self):
705
- # """Calculates the reward based on the difference between the current joint positions and the target joint positions."""
706
- # joint_pos = self.dof_pos.clone()
707
- # pos_target = getattr(self, 'ref_dof_pos', self.default_dof_pos.repeat(self.num_envs, 1).to(self.device)).clone()
708
- # diff = joint_pos - pos_target
709
- # r = torch.exp(-2 * torch.norm(diff, dim=1)) - 0.2 * torch.norm(diff, dim=1).clamp(0, 0.5)
710
-
711
- # return r
712
-
713
- # def _reward_contact_stability(self):
714
- # """Reward for maintaining stable contact force distribution"""
715
- # # Get contact forces for feet
716
- # left_foot_force = self.net_contact_forces[:, self.feet_indices[0]]
717
- # right_foot_force = self.net_contact_forces[:, self.feet_indices[1]]
718
-
719
- # # Calculate force magnitude difference
720
- # force_diff = torch.norm(left_foot_force, dim=1) - torch.norm(right_foot_force, dim=1)
721
-
722
- # # Reward for balanced contact forces
723
- # return torch.exp(-torch.abs(force_diff) / 50.0)
724
-
725
- # def _reward_foot_slip(self):
726
- # """Calculates the reward for minimizing foot slip. The reward is based on the contact forces
727
- # and the speed of the feet. A contact threshold is used to determine if the foot is in contact
728
- # with the ground. The speed of the foot is calculated and scaled by the contact condition.
729
- # """
730
- # contact = self.contact_forces[:, self.feet_indices, 2] > 5.0
731
- # foot_speed_norm = torch.norm(self.rigid_state[:, self.feet_indices, 7:9], dim=2)
732
- # rew = torch.sqrt(foot_speed_norm)
733
- # rew *= contact
734
- # return torch.sum(rew, dim=1)
735
-
736
- # def _reward_dof_vel(self):
737
- # """Penalizes high velocities at the degrees of freedom (DOF) of the robot. This encourages smoother and
738
- # more controlled movements.
739
- # """
740
- # return torch.sum(torch.square(self.dof_vel), dim=1)
0 commit comments