Skip to content

Commit

Permalink
Add additional demos for gravity compensation and sequential joint im…
Browse files Browse the repository at this point in the history
…pedance.

Additional refactoring for demo:
* abstracting functions from Joint Space and Op Space Demos.
* Demo classes for Gravity Compensation Demos, Individiual Joint Impedance Demos.
* README for demos.
  • Loading branch information
rohun33 committed Jan 27, 2021
1 parent 81be478 commit 0bcef15
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 120 deletions.
65 changes: 60 additions & 5 deletions demos/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import numpy as np
import time
from datetime import datetime
import logging
logger = logging.getLogger(__name__)

class Demo(object):
"""Abstract class definition for demonstration.
Expand All @@ -27,10 +29,10 @@ class Demo(object):
initial_pose (list): initial end-effector pose.
"""
def __init__(self,
ctrl_type,
demo_type,
test_fn,
def __init__(self,
ctrl_type,
demo_type,
test_fn,
config_file="demo_control_cfg.yaml",
**kwargs):
self.ctrl_type = ctrl_type
Expand Down Expand Up @@ -62,7 +64,7 @@ def __init__(self,
@property
def world_type(self):
return self.env.config['world']['type']

def get_action_list(self):
raise NotImplementedError

Expand All @@ -86,6 +88,59 @@ def connect_and_reset_robot(self):

self.env.robot_interface.reset()

def print_demo_banner(self):
print("\n**************************************************************************************")
print("\n*********************** perls2 Controller Demo **********************************")

def print_demo_info(self):
print("\n\t Running {} demo \n\t with control type {}. \
\n\t Test function {}".format(
self.ctrl_type, self.demo_type, self.test_fn))

def run(self):
"""Run the demo. Execute actions in sequence and calculate error.
"""
self.print_demo_banner()
self.print_demo_info()
logger.debug("EE Pose initial:\n{}\n".format(self.env.robot_interface.ee_pose))
print("--------")
input("Press Enter to start sending commands.")
self.step_through_demo()
self.env.robot_interface.reset()
self.env.robot_interface.disconnect()

if self.plot_error:
self.plot_errors()
if self.plot_pos:
self.plot_positions()

if self.save:
self.save_data()

def step_through_demo(self):
for i, goal_pose in enumerate(self.goal_poses):

# Get action corresponding to test_fn and goal pose
action= self.get_action(goal_pose, self.get_state())
action_kwargs = self.get_action_kwargs(action)

# Step environment forward
obs, reward, done, info = self.env.step(action_kwargs, time.time())
self.observations.append(obs)
self.actions.append(action)
new_state = self.get_state()
self.states.append(new_state)
self.errors.append(
self.get_delta(goal_pose, new_state))

def reset_log(self):
"""Reset states, actions and observation logs between demos.
"""
self.states = []
self.errors = []
self.actions = []
self.observations = []

# def make_demo(**kwargs):
# """Factory method for making the write demo per params.
# """
Expand Down
205 changes: 147 additions & 58 deletions demos/joint_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
"""

from demo_envs import JointDemoEnv
from demo_path import SequentialJoint
from demo_path import RampSingleJoint
from demo import Demo
import numpy as np
import logging
logger = logging.getLogger(__name__)
import time

import matplotlib.pyplot as plt
# hide verbose matplotlib logging
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING)

MAX_JOINT_DELTA = 0.005
NUM_JOINTS = 6

class JointSpaceDemo(Demo):

Expand Down Expand Up @@ -53,74 +59,23 @@ def __init__(self,
test_fn=self.test_fn,
ctrl_type=self.ctrl_type)

self.joint_num = kwargs['joint_num']

# Joint torque requires edge case to not include gravity comp.
if ctrl_type == "JointTorque":
self.init_joint_state = [0]*7;
else:
self.init_joint_state = self.get_state()

if (delta_val > MAX_JOINT_DELTA):
logger.warn("Specified joint delta_val exceeds limit, clipping to {} rad".format(MAX_JOINT_DELTA))
delta_val = MAX_JOINT_DELTA

self.delta_val = delta_val
self.delta_val = self.clip_delta(delta_val)
self.num_steps = num_steps

self.path = SequentialJoint(start_pose=self.init_joint_state,
delta_val=self.delta_val,
joint_num=self.joint_num,
num_steps=self.num_steps)

self.goal_poses = self.path.path


self.step_num = 0

self.connect_and_reset_robot()

def get_state(self):
if self.ctrl_type == "JointTorque":
return self.env.robot_interface.tau[:7]
return np.subtract(self.env.robot_interface.tau[:7], self.env.robot_interface.gravity_vector)
elif self.ctrl_type == "JointImpedance":
return self.env.robot_interface.q[:7]
elif self.ctrl_type == "JointVelocity":
return self.env.robot_interface.dq[:7]
else:
raise ValueError("Invalid ctrl type.")

def run(self):
"""Run the demo. Execute actions in sequence and calculate error.
def get_delta(self, goal_pose, current_pose):
"""Get delta between joint space poses.
"""
logger.info("Running {} demo \n with control type {}.\n Test \
function {} on Joint Num {}".format(self.demo_type, self.ctrl_type, self.test_fn, self.joint_num))

logger.debug("Initial joint pose \n{}".format(self.env.robot_interface.q))

input("Press enter to confirm and begin running demo.")

for i, goal_pose in enumerate(self.goal_poses):
action = self.get_action(goal_pose, self.get_state())
action_kwargs = self.get_action_kwargs(action)
logger.debug("Action for joint num {} \t{}".format(self.joint_num, action[self.joint_num]))

self.env.step(action_kwargs, time.time())
self.actions.append(action)
new_state = self.get_state()

self.states.append(new_state)
self.errors.append(
self.compute_error(goal_pose, new_state))

logger.debug("Final (actual) joint pose \n{}".format(self.env.robot_interface.q))

self.env.robot_interface.reset()
self.env.robot_interface.disconnect()
if self.plot_error:
self.plot_errors()
if self.plot_pos:
self.plot_positions()
return np.subtract(goal_pose, current_pose)

def get_action_kwargs(self, action):
"""
Expand Down Expand Up @@ -243,6 +198,140 @@ def plot_errors(self):
plt.savefig(fname)
plt.show()

def clip_delta(self, delta_val):
"""Clip delta value to max, preserving sign.
"""
return delta_val


class SingleJointDemo(JointSpaceDemo):
"""Joint Space demonstration for a single joint.
"""
def __init__(self,
config_file="demo_control_cfg.yaml",
delta_val=0.005, num_steps=30,
test_fn='set_joint_delta',
joint_num=6,
**kwargs):
super().__init__(ctrl_type="JointImpedance",
demo_type="SingleJoint",
config_file=config_file,
delta_val=delta_val,
num_steps=num_steps,
test_fn=test_fn,
**kwargs)

self.joint_num = joint_num
self.connect_and_reset_robot()
self.init_joint_state = self.get_state()

self.path = RampSingleJoint(start_pose=self.init_joint_state,
delta_val=self.delta_val,
joint_num=self.joint_num,
num_steps=self.num_steps)

self.goal_poses = self.path.path

self.step_num = 0

def clip_delta(self, delta_val):
"""Clip delta to max value for joint impedance control.
"""
if (np.abs(delta_val) > MAX_JOINT_DELTA):
logger.warn("Specified joint delta_val exceeds limit, clipping to {} rad".format(MAX_JOINT_DELTA))
delta_val = np.sign(delta_val) * MAX_JOINT_DELTA
return delta_val


class GravityCompDemo(JointSpaceDemo):
"""Gravity Compensation to demonstrate "floating" arm.
"""
def __init__(self,
config_file="demo_control_cfg.yaml",
num_steps=100,
**kwargs):

super().__init__(ctrl_type="JointTorque",
demo_type="GravityCompensation",
config_file=config_file,
delta_val=0.0,
num_steps=num_steps,
test_fn="set_joint_torques",
**kwargs)

self.connect_and_reset_robot()
self.init_joint_state = np.zeros(7)
self.path = RampSingleJoint(start_pose=self.init_joint_state,
delta_val=self.delta_val,
joint_num=0,
num_steps=self.num_steps)

self.goal_poses = self.path.path

self.step_num = 0

def print_demo_info(self):
print("\n\t Running Gravity compensation demo")


class JointImpDemoSeq(SingleJointDemo):
def __init__(self,
config_file="demo_control_cfg.yaml",
delta_val=0.005, num_steps=30,
test_fn='set_joint_delta',
**kwargs):
super().__init__(config_file=config_file,
delta_val=delta_val,
num_steps=num_steps,
test_fn=test_fn,**kwargs)

def print_demo_info(self):
print("\n\tRunning Joint Impedance demo for all joints \n with test_function: {}".format(self.test_fn))
print("Robot will perform 6 demonstrations: \n Moving joints 0-6 (base to end-effector) individually. \nRobot will reset before continuing to next demo.")


def run_single_joint_demo(self, joint_num):
self.reset_log()
self.joint_num = joint_num
self.path = RampSingleJoint(start_pose=self.init_joint_state,
delta_val=self.delta_val,
joint_num=self.joint_num,
num_steps=self.num_steps)
self.goal_poses = self.path.path
self.step_num = 0

print("------------------------")
print("Running Joint Impedance Demo on joint num {} \n \
\nTest function {}".format(
self.joint_num, self.ctrl_type, self.test_fn))
logger.debug("EE Pose initial:\n{}\n".format(self.env.robot_interface.ee_pose))
print("--------")
input("Press Enter to start sending commands.")
self.step_through_demo()
self.env.robot_interface.reset()
if self.plot_error:
self.plot_errors()
if self.plot_pos:
self.plot_positions()

if self.save:
self.save_data()

def run(self):
"""Run the demo. Execute actions in sequence and calculate error.
"""
self.print_demo_banner()
self.print_demo_info()

# Run joint impedance demos for each joint
for joint_index in range(NUM_JOINTS+1):
self.run_single_joint_demo(joint_index)

print("Demo complete...disconnecting robot.")
self.env.robot_interface.disconnect()


if __name__ == '__main__':
import argparse

Expand Down Expand Up @@ -280,5 +369,5 @@ def plot_errors(self):

args = parser.parse_args()
kwargs = vars(args)
demo = JointSpaceDemo(**kwargs)
demo = SingleJointDemo(**kwargs)
demo.run()
Loading

0 comments on commit 0bcef15

Please sign in to comment.