diff --git a/demos/demo.py b/demos/demo.py index 577591b7..cbd3b98e 100644 --- a/demos/demo.py +++ b/demos/demo.py @@ -4,6 +4,8 @@ import numpy as np import time from datetime import datetime +import logging +logger = logging.getLogger(__name__) class Demo(object): """Abstract class definition for demonstration. @@ -27,10 +29,10 @@ class Demo(object): initial_pose (list): initial end-effector pose. """ - def __init__(self, - ctrl_type, - demo_type, - test_fn, + def __init__(self, + ctrl_type, + demo_type, + test_fn, config_file="demo_control_cfg.yaml", **kwargs): self.ctrl_type = ctrl_type @@ -62,7 +64,7 @@ def __init__(self, @property def world_type(self): return self.env.config['world']['type'] - + def get_action_list(self): raise NotImplementedError @@ -86,6 +88,59 @@ def connect_and_reset_robot(self): self.env.robot_interface.reset() + def print_demo_banner(self): + print("\n**************************************************************************************") + print("\n*********************** perls2 Controller Demo **********************************") + + def print_demo_info(self): + print("\n\t Running {} demo \n\t with control type {}. \ + \n\t Test function {}".format( + self.ctrl_type, self.demo_type, self.test_fn)) + + def run(self): + """Run the demo. Execute actions in sequence and calculate error. + """ + self.print_demo_banner() + self.print_demo_info() + logger.debug("EE Pose initial:\n{}\n".format(self.env.robot_interface.ee_pose)) + print("--------") + input("Press Enter to start sending commands.") + self.step_through_demo() + self.env.robot_interface.reset() + self.env.robot_interface.disconnect() + + if self.plot_error: + self.plot_errors() + if self.plot_pos: + self.plot_positions() + + if self.save: + self.save_data() + + def step_through_demo(self): + for i, goal_pose in enumerate(self.goal_poses): + + # Get action corresponding to test_fn and goal pose + action= self.get_action(goal_pose, self.get_state()) + action_kwargs = self.get_action_kwargs(action) + + # Step environment forward + obs, reward, done, info = self.env.step(action_kwargs, time.time()) + self.observations.append(obs) + self.actions.append(action) + new_state = self.get_state() + self.states.append(new_state) + self.errors.append( + self.get_delta(goal_pose, new_state)) + + def reset_log(self): + """Reset states, actions and observation logs between demos. + """ + self.states = [] + self.errors = [] + self.actions = [] + self.observations = [] + # def make_demo(**kwargs): # """Factory method for making the write demo per params. # """ diff --git a/demos/joint_demo.py b/demos/joint_demo.py index c9de131a..9f90fe8c 100644 --- a/demos/joint_demo.py +++ b/demos/joint_demo.py @@ -2,14 +2,20 @@ """ from demo_envs import JointDemoEnv -from demo_path import SequentialJoint +from demo_path import RampSingleJoint from demo import Demo import numpy as np import logging logger = logging.getLogger(__name__) import time +import matplotlib.pyplot as plt +# hide verbose matplotlib logging +mpl_logger = logging.getLogger('matplotlib') +mpl_logger.setLevel(logging.WARNING) + MAX_JOINT_DELTA = 0.005 +NUM_JOINTS = 6 class JointSpaceDemo(Demo): @@ -53,36 +59,12 @@ def __init__(self, test_fn=self.test_fn, ctrl_type=self.ctrl_type) - self.joint_num = kwargs['joint_num'] - - # Joint torque requires edge case to not include gravity comp. - if ctrl_type == "JointTorque": - self.init_joint_state = [0]*7; - else: - self.init_joint_state = self.get_state() - - if (delta_val > MAX_JOINT_DELTA): - logger.warn("Specified joint delta_val exceeds limit, clipping to {} rad".format(MAX_JOINT_DELTA)) - delta_val = MAX_JOINT_DELTA - - self.delta_val = delta_val + self.delta_val = self.clip_delta(delta_val) self.num_steps = num_steps - self.path = SequentialJoint(start_pose=self.init_joint_state, - delta_val=self.delta_val, - joint_num=self.joint_num, - num_steps=self.num_steps) - - self.goal_poses = self.path.path - - - self.step_num = 0 - - self.connect_and_reset_robot() - def get_state(self): if self.ctrl_type == "JointTorque": - return self.env.robot_interface.tau[:7] + return np.subtract(self.env.robot_interface.tau[:7], self.env.robot_interface.gravity_vector) elif self.ctrl_type == "JointImpedance": return self.env.robot_interface.q[:7] elif self.ctrl_type == "JointVelocity": @@ -90,37 +72,10 @@ def get_state(self): else: raise ValueError("Invalid ctrl type.") - def run(self): - """Run the demo. Execute actions in sequence and calculate error. + def get_delta(self, goal_pose, current_pose): + """Get delta between joint space poses. """ - logger.info("Running {} demo \n with control type {}.\n Test \ - function {} on Joint Num {}".format(self.demo_type, self.ctrl_type, self.test_fn, self.joint_num)) - - logger.debug("Initial joint pose \n{}".format(self.env.robot_interface.q)) - - input("Press enter to confirm and begin running demo.") - - for i, goal_pose in enumerate(self.goal_poses): - action = self.get_action(goal_pose, self.get_state()) - action_kwargs = self.get_action_kwargs(action) - logger.debug("Action for joint num {} \t{}".format(self.joint_num, action[self.joint_num])) - - self.env.step(action_kwargs, time.time()) - self.actions.append(action) - new_state = self.get_state() - - self.states.append(new_state) - self.errors.append( - self.compute_error(goal_pose, new_state)) - - logger.debug("Final (actual) joint pose \n{}".format(self.env.robot_interface.q)) - - self.env.robot_interface.reset() - self.env.robot_interface.disconnect() - if self.plot_error: - self.plot_errors() - if self.plot_pos: - self.plot_positions() + return np.subtract(goal_pose, current_pose) def get_action_kwargs(self, action): """ @@ -243,6 +198,140 @@ def plot_errors(self): plt.savefig(fname) plt.show() + def clip_delta(self, delta_val): + """Clip delta value to max, preserving sign. + """ + return delta_val + + +class SingleJointDemo(JointSpaceDemo): + """Joint Space demonstration for a single joint. + """ + def __init__(self, + config_file="demo_control_cfg.yaml", + delta_val=0.005, num_steps=30, + test_fn='set_joint_delta', + joint_num=6, + **kwargs): + super().__init__(ctrl_type="JointImpedance", + demo_type="SingleJoint", + config_file=config_file, + delta_val=delta_val, + num_steps=num_steps, + test_fn=test_fn, + **kwargs) + + self.joint_num = joint_num + self.connect_and_reset_robot() + self.init_joint_state = self.get_state() + + self.path = RampSingleJoint(start_pose=self.init_joint_state, + delta_val=self.delta_val, + joint_num=self.joint_num, + num_steps=self.num_steps) + + self.goal_poses = self.path.path + + self.step_num = 0 + + def clip_delta(self, delta_val): + """Clip delta to max value for joint impedance control. + + """ + if (np.abs(delta_val) > MAX_JOINT_DELTA): + logger.warn("Specified joint delta_val exceeds limit, clipping to {} rad".format(MAX_JOINT_DELTA)) + delta_val = np.sign(delta_val) * MAX_JOINT_DELTA + return delta_val + + +class GravityCompDemo(JointSpaceDemo): + """Gravity Compensation to demonstrate "floating" arm. + """ + def __init__(self, + config_file="demo_control_cfg.yaml", + num_steps=100, + **kwargs): + + super().__init__(ctrl_type="JointTorque", + demo_type="GravityCompensation", + config_file=config_file, + delta_val=0.0, + num_steps=num_steps, + test_fn="set_joint_torques", + **kwargs) + + self.connect_and_reset_robot() + self.init_joint_state = np.zeros(7) + self.path = RampSingleJoint(start_pose=self.init_joint_state, + delta_val=self.delta_val, + joint_num=0, + num_steps=self.num_steps) + + self.goal_poses = self.path.path + + self.step_num = 0 + + def print_demo_info(self): + print("\n\t Running Gravity compensation demo") + + +class JointImpDemoSeq(SingleJointDemo): + def __init__(self, + config_file="demo_control_cfg.yaml", + delta_val=0.005, num_steps=30, + test_fn='set_joint_delta', + **kwargs): + super().__init__(config_file=config_file, + delta_val=delta_val, + num_steps=num_steps, + test_fn=test_fn,**kwargs) + + def print_demo_info(self): + print("\n\tRunning Joint Impedance demo for all joints \n with test_function: {}".format(self.test_fn)) + print("Robot will perform 6 demonstrations: \n Moving joints 0-6 (base to end-effector) individually. \nRobot will reset before continuing to next demo.") + + + def run_single_joint_demo(self, joint_num): + self.reset_log() + self.joint_num = joint_num + self.path = RampSingleJoint(start_pose=self.init_joint_state, + delta_val=self.delta_val, + joint_num=self.joint_num, + num_steps=self.num_steps) + self.goal_poses = self.path.path + self.step_num = 0 + + print("------------------------") + print("Running Joint Impedance Demo on joint num {} \n \ + \nTest function {}".format( + self.joint_num, self.ctrl_type, self.test_fn)) + logger.debug("EE Pose initial:\n{}\n".format(self.env.robot_interface.ee_pose)) + print("--------") + input("Press Enter to start sending commands.") + self.step_through_demo() + self.env.robot_interface.reset() + if self.plot_error: + self.plot_errors() + if self.plot_pos: + self.plot_positions() + + if self.save: + self.save_data() + + def run(self): + """Run the demo. Execute actions in sequence and calculate error. + """ + self.print_demo_banner() + self.print_demo_info() + + # Run joint impedance demos for each joint + for joint_index in range(NUM_JOINTS+1): + self.run_single_joint_demo(joint_index) + + print("Demo complete...disconnecting robot.") + self.env.robot_interface.disconnect() + + if __name__ == '__main__': import argparse @@ -280,5 +369,5 @@ def plot_errors(self): args = parser.parse_args() kwargs = vars(args) - demo = JointSpaceDemo(**kwargs) + demo = SingleJointDemo(**kwargs) demo.run() \ No newline at end of file diff --git a/demos/osc_demo.py b/demos/osc_demo.py index 760a28c7..3af23162 100644 --- a/demos/osc_demo.py +++ b/demos/osc_demo.py @@ -39,30 +39,6 @@ def compute_error(goal_state, new_state): return np.hstack((pos_error, ori_error)) -def get_delta(goal_pose, current_pose): - """Get delta between goal pose and current_pose. - - Args: goal_pose (list): 7f pose [x, y, z, qx, qy, qz, w] . Position and quaternion of goal pose. - current_pose (list): 7f Position and quaternion of current pose. - - Returns: delta (list) 6f [dx, dy, dz, ax, ay, az] delta position and delta orientation - as axis-angle. - """ - - if len(goal_pose) != 7: - raise ValueError("Goal pose incorrect dimension should be 7f") - if len(current_pose) !=7: - raise ValueError("Current pose incorrect dimension, should be 7f") - - dpos = np.subtract(goal_pose[:3], current_pose[:3]) - goal_mat = T.quat2mat(goal_pose[3:]) - current_mat = T.quat2mat(current_pose[3:]) - delta_mat_T = np.dot(goal_mat, current_mat.T) - delta_quat = T.mat2quat(np.transpose(delta_mat_T)) - delta_aa = T.quat2axisangle(delta_quat) - - return np.hstack((dpos, delta_aa)).tolist() - class OpSpaceDemo(Demo): """Operational space demo for testing EEImpedance and EEPosture control. @@ -188,16 +164,6 @@ def get_action(self, goal_pose, current_pose): raise ValueError("invalid test fn") return action - def print_demo_banner(self): - print("\n**************************************************************************************") - print("\n*********************** perls2 Controller Demo **********************************") - - def print_demo_info(self): - print("\n\t Running {} demo \n\t with control type {}. \ - \n\t Test function {}".format( - self.ctrl_type, self.demo_type, self.test_fn)) - - def run(self): """Run the demo. Execute actions in sequence and calculate error. """ @@ -218,28 +184,36 @@ def run(self): if self.save: self.save_data() - def step_through_demo(self): - for i, goal_pose in enumerate(self.goal_poses): - - # Get action corresponding to test_fn and goal pose - action= self.get_action(goal_pose, self.get_state()) - action_kwargs = self.get_action_kwargs(action) - - # Step environment forward - obs, reward, done, info = self.env.step(action_kwargs, time.time()) - self.observations.append(obs) - self.actions.append(action) - new_state = self.get_state() - self.states.append(new_state) - self.errors.append( - get_delta(goal_pose, new_state)) - def get_action_list(self): if (self.test_fn == "move_ee_delta"): return self.path.deltas elif (self.test_fn == "set_ee_pose"): return self.path.path + def get_delta(self, goal_pose, current_pose): + """Get delta between goal pose and current_pose. + + Args: goal_pose (list): 7f pose [x, y, z, qx, qy, qz, w] . Position and quaternion of goal pose. + current_pose (list): 7f Position and quaternion of current pose. + + Returns: delta (list) 6f [dx, dy, dz, ax, ay, az] delta position and delta orientation + as axis-angle. + """ + + if len(goal_pose) != 7: + raise ValueError("Goal pose incorrect dimension should be 7f") + if len(current_pose) !=7: + raise ValueError("Current pose incorrect dimension, should be 7f") + + dpos = np.subtract(goal_pose[:3], current_pose[:3]) + goal_mat = T.quat2mat(goal_pose[3:]) + current_mat = T.quat2mat(current_pose[3:]) + delta_mat_T = np.dot(goal_mat, current_mat.T) + delta_quat = T.mat2quat(np.transpose(delta_mat_T)) + delta_aa = T.quat2axisangle(delta_quat) + + return np.hstack((dpos, delta_aa)).tolist() + def plot_sequence_arrows(self, axes, x, y, color): """Plot a sequence of arrows given a list of x y points. """ @@ -474,13 +448,31 @@ def run_demo_axis(self, axis): self.save_data() - def reset_log(self): - """Reset states, actions and observation logs between demos. - """ - self.states = [] - self.errors = [] - self.actions = [] - self.observations = [] +class OpSpaceFixedPoseDemo(OpSpaceLineDemo): + """OpSpace Demo for maintaining initial ee pose. + """ + def __init__(self, + ctrl_type="EEImpedance", + test_fn="move_ee_delta", + config_file="demo_control_cfg.yaml", + num_steps=100, **kwargs): + + super().__init__( + ctrl_type=ctrl_type, + test_fn=test_fn, + config_file=config_file, + axis='x', + path_length=None, + delta_val=0.0, + num_steps=num_steps, **kwargs) + + def print_demo_info(self): + print("\n\t Running OSC fixed ee-pose demo \n\t with control type {}. \ + \n\t Test function {}".format( + self.ctrl_type, self.demo_type, self.test_fn)) + print("Robot will hold initial end-effector pose:\n\t {}".format(self.initial_pose)) + + class OpSpaceSquareDemo(OpSpaceDemo): """OpSpace Demo for square path in xy plane. diff --git a/demos/run_gc_demo.py b/demos/run_gc_demo.py new file mode 100644 index 00000000..5e7142c0 --- /dev/null +++ b/demos/run_gc_demo.py @@ -0,0 +1,29 @@ +"""Script to demonstrate "Floating" Robot Demo. + +Joint torques are set to 0. +Robot gravity compensation must be enabled! +""" +from joint_demo import GravityCompDemo +import argparse + +parser = argparse.ArgumentParser( + description="Test controllers and measure errors.") + +parser.add_argument('--num_steps', default=500, type=int, + help="max steps for demo.") +parser.add_argument('--plot_pos', action="store_true", + help="whether to plot positions of demo.") +parser.add_argument('--plot_error', action="store_true", + help="whether to plot errors.") +parser.add_argument('--save', action="store_true", + help="whether to store data to file") +parser.add_argument('--demo_name', default=None, + type=str, help="Valid filename for demo.") +parser.add_argument('--save_fig', action="store_true", + help="whether to save pngs of plots") +parser.add_argument('--config_file', default='demos/demo_control_cfg.yaml', help='absolute filepath for config file.') + +args = parser.parse_args() +kwargs = vars(args) +demo = GravityCompDemo(**kwargs) +demo.run() \ No newline at end of file diff --git a/demos/run_jointimp_demo.py b/demos/run_jointimp_demo.py new file mode 100644 index 00000000..41b9a3c3 --- /dev/null +++ b/demos/run_jointimp_demo.py @@ -0,0 +1,28 @@ +"""Script to run Joint Impedance Demo. +""" +from joint_demo import JointImpDemoSeq +import argparse + +parser = argparse.ArgumentParser( + description="Test controllers and measure errors.") +parser.add_argument('--delta_val', + default=0.01, type=float, + help="Max step size (m or rad) to take for demo.") +parser.add_argument('--num_steps', default=50, type=int, + help="max steps for demo.") +parser.add_argument('--plot_pos', action="store_true", + help="whether to plot positions of demo.") +parser.add_argument('--plot_error', action="store_true", + help="whether to plot errors.") +parser.add_argument('--save', action="store_true", + help="whether to store data to file") +parser.add_argument('--demo_name', default=None, + type=str, help="Valid filename for demo.") +parser.add_argument('--save_fig', action="store_true", + help="whether to save pngs of plots") +parser.add_argument('--config_file', default='demos/demo_control_cfg.yaml', help='absolute filepath for config file.') + +args = parser.parse_args() +kwargs = vars(args) +demo = JointImpDemoSeq(**kwargs) +demo.run()