Skip to content

Commit

Permalink
torque settings & speeds
Browse files Browse the repository at this point in the history
  • Loading branch information
hatomist committed Oct 24, 2024
1 parent b20ad99 commit af3116a
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 34 deletions.
2 changes: 1 addition & 1 deletion openlch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "0.5.3"
__version__ = "0.6.0"

from .hal import HAL
38 changes: 33 additions & 5 deletions openlch/hal.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ class Servo:
def __init__(self, stub):
self.__stub = stub

def get_positions(self) -> List[Tuple[int, float]]:
def get_positions(self) -> List[Tuple[int, float, float]]:
"""
Get current positions of all servos.
Get current positions and speeds of all servos.
Returns:
List[Tuple[int, float]]: A list of tuples containing servo IDs and their positions.
List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.
"""
response = self.__stub.GetPositions(hal_pb_pb2.Empty())
return [(pos.id, pos.position) for pos in response.positions]
return [(pos.id, pos.position, pos.speed) for pos in response.positions]

def set_positions(self, positions: List[Tuple[int, float]]) -> None:
"""
Expand All @@ -52,7 +52,7 @@ def set_positions(self, positions: List[Tuple[int, float]]) -> None:
positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
"""
joint_positions = [
hal_pb_pb2.JointPosition(id=id, position=position)
hal_pb_pb2.JointPosition(id=id, position=position, speed=0)
for id, position in positions
]
request = hal_pb_pb2.JointPositions(positions=joint_positions)
Expand Down Expand Up @@ -191,6 +191,34 @@ def get_calibration_status(self) -> Dict[str, Union[bool, int]]:
'calibrating_servo_id': response.calibrating_servo_id
}

def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None:
"""
Set torque for multiple servos.
Args:
torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
"""
settings = [
hal_pb_pb2.TorqueSetting(id=id, torque=torque)
for id, torque in torque_settings
]
request = hal_pb_pb2.TorqueSettings(settings=settings)
self.__stub.SetTorque(request)

def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None:
"""
Enable or disable torque for multiple servos.
Args:
enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
"""
settings = [
hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable)
for id, enable in enable_settings
]
request = hal_pb_pb2.TorqueEnableSettings(settings=settings)
self.__stub.SetTorqueEnable(request)

class System:
"""Class for system-related operations."""

Expand Down
64 changes: 36 additions & 28 deletions openlch/hal_pb_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions openlch/hal_pb_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ def __init__(self, channel):
request_serializer=hal__pb__pb2.Empty.SerializeToString,
response_deserializer=hal__pb__pb2.CalibrationStatus.FromString,
_registered_method=True)
self.SetTorque = channel.unary_unary(
'/hal_pb.ServoControl/SetTorque',
request_serializer=hal__pb__pb2.TorqueSettings.SerializeToString,
response_deserializer=hal__pb__pb2.Empty.FromString,
_registered_method=True)
self.SetTorqueEnable = channel.unary_unary(
'/hal_pb.ServoControl/SetTorqueEnable',
request_serializer=hal__pb__pb2.TorqueEnableSettings.SerializeToString,
response_deserializer=hal__pb__pb2.Empty.FromString,
_registered_method=True)


class ServoControlServicer(object):
Expand Down Expand Up @@ -171,6 +181,18 @@ def GetCalibrationStatus(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def SetTorque(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def SetTorqueEnable(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_ServoControlServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand Down Expand Up @@ -234,6 +256,16 @@ def add_ServoControlServicer_to_server(servicer, server):
request_deserializer=hal__pb__pb2.Empty.FromString,
response_serializer=hal__pb__pb2.CalibrationStatus.SerializeToString,
),
'SetTorque': grpc.unary_unary_rpc_method_handler(
servicer.SetTorque,
request_deserializer=hal__pb__pb2.TorqueSettings.FromString,
response_serializer=hal__pb__pb2.Empty.SerializeToString,
),
'SetTorqueEnable': grpc.unary_unary_rpc_method_handler(
servicer.SetTorqueEnable,
request_deserializer=hal__pb__pb2.TorqueEnableSettings.FromString,
response_serializer=hal__pb__pb2.Empty.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'hal_pb.ServoControl', rpc_method_handlers)
Expand Down Expand Up @@ -568,3 +600,57 @@ def GetCalibrationStatus(request,
timeout,
metadata,
_registered_method=True)

@staticmethod
def SetTorque(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/hal_pb.ServoControl/SetTorque',
hal__pb__pb2.TorqueSettings.SerializeToString,
hal__pb__pb2.Empty.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def SetTorqueEnable(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/hal_pb.ServoControl/SetTorqueEnable',
hal__pb__pb2.TorqueEnableSettings.SerializeToString,
hal__pb__pb2.Empty.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

0 comments on commit af3116a

Please sign in to comment.