diff --git a/EosLib/format/base_format.py b/EosLib/format/base_format.py new file mode 100644 index 0000000..2eca469 --- /dev/null +++ b/EosLib/format/base_format.py @@ -0,0 +1,23 @@ +from abc import ABC +from abc import abstractmethod +from typing_extensions import Self + +import EosLib + + +class BaseFormat(ABC): + @abstractmethod + def __init__(self): + raise NotImplementedError + + @abstractmethod + def encode(self) -> bytes: + raise NotImplementedError + + def encode_for_transmit(self) -> bytes: + return self.encode()[0:EosLib.packet.Packet.radio_body_max_bytes] + + @classmethod + @abstractmethod + def decode(cls, data: bytes | EosLib.packet.Packet) -> Self: + raise NotImplementedError diff --git a/EosLib/format/csv_format.py b/EosLib/format/csv_format.py new file mode 100644 index 0000000..2f9829b --- /dev/null +++ b/EosLib/format/csv_format.py @@ -0,0 +1,20 @@ +from abc import ABC +from abc import abstractmethod +from typing_extensions import Self + +from EosLib.format.base_format import BaseFormat + + +class CsvFormat(BaseFormat, ABC): + @abstractmethod + def get_csv_headers(self): + raise NotImplementedError + + @abstractmethod + def encode_to_csv(self) -> str: + raise NotImplementedError + + @classmethod + @abstractmethod + def decode_from_csv(cls, csv: str) -> Self: + raise NotImplementedError diff --git a/EosLib/format/position.py b/EosLib/format/position.py index c98c14a..bdc1f55 100644 --- a/EosLib/format/position.py +++ b/EosLib/format/position.py @@ -1,9 +1,11 @@ +import datetime import struct -from datetime import datetime from enum import IntEnum +from typing_extensions import Self -from EosLib import Type -from EosLib.packet.packet import Packet +import EosLib +from EosLib.format.csv_format import CsvFormat +from EosLib.packet import Packet class FlightState(IntEnum): @@ -14,8 +16,8 @@ class FlightState(IntEnum): DESCENT = 4 -class Position: - # Struct format is: timestamp, lat, long, speed, altitude, number of satellites, flight state +class Position(CsvFormat): + # Struct format is: GPS timestamp, lat, long, speed, altitude, number of satellites, flight state gps_struct_string = "!" \ "d" \ "d" \ @@ -25,51 +27,80 @@ class Position: "H" \ "B" - def __init__(self): - self.local_time = None - self.timestamp = None - self.latitude = None - self.longitude = None - self.altitude = None - self.speed = None - self.number_of_satellites = None - self.valid = False - self.flight_state = FlightState.NOT_SET - - # TODO: figure out a more legitimate way to check validity - def set_validity(self): + def __init__(self, + gps_time: datetime.datetime.now(), + latitude: float, + longitude: float, + speed: float, + altitude: float, + number_of_satellites: int, + flight_state: FlightState = FlightState.NOT_SET): + self.gps_time = gps_time + self.latitude = latitude + self.longitude = longitude + self.altitude = altitude + self.speed = speed + self.number_of_satellites = number_of_satellites + self.flight_state = flight_state + + self.valid = self.get_validity() + + def encode(self) -> bytes: + return struct.pack(self.gps_struct_string, + self.gps_time.timestamp(), + self.latitude, + self.longitude, + self.speed, + self.altitude, + self.number_of_satellites, + self.flight_state) + + @classmethod + def decode(cls, data: bytes | Packet) -> Self: + if isinstance(data, Packet): + if data.data_header.data_type != EosLib.Type.POSITION: + raise ValueError("Attempted to decode a non-position packet using Position") + else: + data = data.body + + unpacked_data = struct.unpack(cls.gps_struct_string, data) + + return Position(datetime.datetime.fromtimestamp(unpacked_data[0]), + unpacked_data[1], + unpacked_data[2], + unpacked_data[3], + unpacked_data[4], + unpacked_data[5], + FlightState(unpacked_data[6])) + + def get_csv_headers(self): + return ["GPS Timestamp", "Latitude", "Longitude", "Speed", "Altitude", "Number of Satellites", "Flight State"] + + def encode_to_csv(self) -> str: + return ",".join([self.gps_time.isoformat(), + str(self.latitude), + str(self.longitude), + str(self.speed), + str(self.altitude), + str(self.number_of_satellites), + str(self.flight_state.value)]) + + @classmethod + def decode_from_csv(cls, csv: str) -> Self: + csv_list = csv.split(",") + + return Position(datetime.datetime.fromisoformat(csv_list[0]), + float(csv_list[1]), + float(csv_list[2]), + float(csv_list[3]), + float(csv_list[4]), + int(csv_list[5]), + FlightState(int(csv_list[6]))) + + def get_validity(self): if (self.number_of_satellites < 4 or self.latitude == 0 or self.longitude == 0): - self.valid = False - else: - self.valid = True - - @staticmethod - def decode_position(gps_packet: Packet | bytes): - new_position = Position() - - if isinstance(gps_packet, Packet): - if gps_packet.data_header.data_type != Type.POSITION: - raise ValueError("Packet is not a position") - packet_body = gps_packet.body + return False else: - # this is the case where the bytes of the packet body are inputted and not the packet itself - packet_body = gps_packet - unpacked_tuple = struct.unpack(Position.gps_struct_string, packet_body) - new_position.timestamp = datetime.fromtimestamp(unpacked_tuple[0]) - new_position.latitude = unpacked_tuple[1] - new_position.longitude = unpacked_tuple[2] - new_position.altitude = unpacked_tuple[3] - new_position.speed = unpacked_tuple[4] - new_position.number_of_satellites = unpacked_tuple[5] - new_position.flight_state = unpacked_tuple[6] - new_position.set_validity() - - return new_position - - @staticmethod - def encode_position(timestamp: float, latitude: float, longitude: float, altitude: float, speed: float, - number_of_satellites: int, flight_state: FlightState) -> bytes: - return struct.pack(Position.gps_struct_string, timestamp, latitude, longitude, altitude, speed, - number_of_satellites, flight_state) + return True diff --git a/requirements.txt b/requirements.txt index d732901..276d6c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ pluggy==1.0.0 pyparsing==3.0.9 pytest==7.2.2 tomli==2.0.1 +typing-extensions==4.6.0 \ No newline at end of file diff --git a/tests/format/test_position.py b/tests/format/test_position.py index 3183228..5ee6c75 100644 --- a/tests/format/test_position.py +++ b/tests/format/test_position.py @@ -1,66 +1,44 @@ -from datetime import datetime -from EosLib.packet.packet import Packet -import EosLib.packet.transmit_header -import EosLib.packet.data_header -from EosLib.format.position import Position, FlightState - -import EosLib -from EosLib.packet import data_header, transmit_header -from EosLib.device import Device -from EosLib.packet import Packet -from EosLib.packet.definitions import Type - -def test_position_bytes(): - """Tests encode and decode functions - - :returns: If encode and decode work - :rtype: boolean - """ - current_time = datetime.now() - new_data = Position() - - # new_data.set_validity() - time = current_time - encoded_new_data = new_data.encode_position(time.timestamp(), 23.0, 24.0, 25.0, 26.0, 5, - FlightState.NOT_SET) - - decoded_new_data = Position.decode_position(encoded_new_data) - - assert time == decoded_new_data.timestamp - assert 23.0 == decoded_new_data.latitude - assert 24.0 == decoded_new_data.longitude - assert 25.0 == decoded_new_data.altitude - assert 26.0 == decoded_new_data.speed - assert 5 == decoded_new_data.number_of_satellites - assert FlightState.NOT_SET == decoded_new_data.flight_state - assert decoded_new_data.valid - - -def test_position_packet(): - """Tests encode and decode functions for packet input - - :returns: If encode and decode work - :rtype: boolean - """ - - position_data_header = EosLib.packet.data_header.DataHeader(Device.O3, Type.POSITION) - position_transmit_header = EosLib.packet.transmit_header.TransmitHeader(2) - - current_time = datetime.now() - new_data = Position() - - encoded_new_data = new_data.encode_position(current_time.timestamp(), 24.0, 25.0, 26.0, 27.0, 5, - FlightState.NOT_SET) - packet = Packet(encoded_new_data, position_data_header, position_transmit_header) - - decoded_new_data = Position.decode_position(packet) - - assert current_time == decoded_new_data.timestamp - assert 24.0 == decoded_new_data.latitude - assert 25.0 == decoded_new_data.longitude - assert 26.0 == decoded_new_data.altitude - assert 27.0 == decoded_new_data.speed - assert 5 == decoded_new_data.number_of_satellites - assert FlightState.NOT_SET == decoded_new_data.flight_state - assert decoded_new_data.valid - +import datetime + +from EosLib.format.position import Position +from EosLib.format.position import FlightState + + +def get_good_position(): + return Position(datetime.datetime.now(), + 33.7756, + 84.3963, + 974.5, + 70.2, + 5, + FlightState.DESCENT) + + +def test_encode_decode_bytes(): + base_position = get_good_position() + base_position_bytes = base_position.encode() + new_position = Position.decode(base_position_bytes) + valid_decode = base_position.gps_time == new_position.gps_time and \ + base_position.latitude == new_position.latitude and \ + base_position.longitude == new_position.longitude and \ + base_position.altitude == new_position.altitude and \ + base_position.speed == new_position.speed and \ + base_position.number_of_satellites == new_position.number_of_satellites and\ + base_position.flight_state == new_position.flight_state + print("loading") + assert valid_decode + + +def test_encode_decode_csv(): + base_position = get_good_position() + base_position_csv = base_position.encode_to_csv() + new_position = Position.decode_from_csv(base_position_csv) + valid_decode = base_position.gps_time == new_position.gps_time and \ + base_position.latitude == new_position.latitude and \ + base_position.longitude == new_position.longitude and \ + base_position.altitude == new_position.altitude and \ + base_position.speed == new_position.speed and \ + base_position.number_of_satellites == new_position.number_of_satellites and\ + base_position.flight_state == new_position.flight_state + print("loading") + assert valid_decode