diff --git a/EosLib/format/__init__.py b/EosLib/format/__init__.py index 81eae89..fa1b3ce 100644 --- a/EosLib/format/__init__.py +++ b/EosLib/format/__init__.py @@ -1,4 +1,4 @@ -from EosLib.format.formats import telemetry_data, position +from EosLib.format.formats import telemetry_data, position, empty_format from EosLib.format.definitions import Type as Type diff --git a/EosLib/format/decode_factory.py b/EosLib/format/decode_factory.py index d1dbb19..0c4440e 100644 --- a/EosLib/format/decode_factory.py +++ b/EosLib/format/decode_factory.py @@ -1,12 +1,27 @@ +from EosLib.format import csv_format + + class DecodeFactory: def __init__(self): self._decoders = {} + self._csv_decoders = {} def register_decoder(self, format_class): self._decoders[format_class.get_format_type()] = format_class.decode + if issubclass(format_class, csv_format.CsvFormat): + self._csv_decoders[format_class.get_format_type()] = format_class.decode_from_csv def decode(self, data_format, data: bytes): - return self._decoders[data_format](data) + decoder = self._decoders.get(data_format) + if decoder is None: + raise TypeError("No decoder found for this type") + return decoder(data) + + def decode_from_csv(self, data_format, data: str): + decoder = self._csv_decoders.get(data_format) + if decoder is None: + raise TypeError("No decoder found for this type") + return decoder(data) decode_factory = DecodeFactory() diff --git a/EosLib/format/definitions.py b/EosLib/format/definitions.py index f443242..fa5e352 100644 --- a/EosLib/format/definitions.py +++ b/EosLib/format/definitions.py @@ -11,4 +11,5 @@ class Type(IntEnum): COMMAND = 5 RESPONSE = 6 TELEMETRY_DATA = 7 + EMPTY = 8 ERROR = 255 diff --git a/EosLib/format/formats/empty_format.py b/EosLib/format/formats/empty_format.py new file mode 100644 index 0000000..301506a --- /dev/null +++ b/EosLib/format/formats/empty_format.py @@ -0,0 +1,30 @@ +import struct + +from typing_extensions import Self + +from EosLib.format.base_format import BaseFormat + +from EosLib.format.definitions import Type + + +class EmptyFormat(BaseFormat): + + def __init__(self, num_bytes=0): + self.num_bytes = num_bytes + + def __eq__(self, other): + return self.num_bytes == other.num_bytes + + @staticmethod + def get_format_type() -> Type: + return Type.EMPTY + + def encode(self) -> bytes: + return struct.pack(self.get_format_string()) + + @classmethod + def decode(cls, data: bytes) -> Self: + return EmptyFormat(len(data)) + + def get_format_string(self) -> str: + return self.num_bytes * "x" diff --git a/EosLib/format/formats/position.py b/EosLib/format/formats/position.py index 810e694..f0de521 100644 --- a/EosLib/format/formats/position.py +++ b/EosLib/format/formats/position.py @@ -1,4 +1,6 @@ +import csv import datetime +import io import struct from enum import IntEnum from typing_extensions import Self @@ -86,7 +88,9 @@ def get_csv_headers(self): return ["GPS Timestamp", "Latitude", "Longitude", "Speed", "Altitude", "Number of Satellites", "Flight State"] def encode_to_csv(self) -> str: - return ",".join([self.gps_time.isoformat(), + output = io.StringIO() + writer = csv.writer(output) + writer.writerow([self.gps_time.isoformat(), str(self.latitude), str(self.longitude), str(self.speed), @@ -94,9 +98,12 @@ def encode_to_csv(self) -> str: str(self.number_of_satellites), str(self.flight_state.value)]) + return output.getvalue() + @classmethod - def decode_from_csv(cls, csv: str) -> Self: - csv_list = csv.split(",") + def decode_from_csv(cls, csv_string: str) -> Self: + reader = csv.reader([csv_string]) + csv_list = list(reader)[0] return Position(datetime.datetime.fromisoformat(csv_list[0]), float(csv_list[1]), diff --git a/EosLib/format/formats/telemetry_data.py b/EosLib/format/formats/telemetry_data.py index 74e8084..710c04b 100644 --- a/EosLib/format/formats/telemetry_data.py +++ b/EosLib/format/formats/telemetry_data.py @@ -1,3 +1,5 @@ +import csv +import io import struct from typing_extensions import Self @@ -88,16 +90,21 @@ def get_csv_headers(self): return ["temperature", "pressure", "humidity", "x_rotation", "y_rotation", "z_rotation"] def encode_to_csv(self) -> str: - return ",".join([str(self.temperature), + output = io.StringIO() + writer = csv.writer(output) + writer.writerow([str(self.temperature), str(self.pressure), str(self.humidity), str(self.x_rotation), str(self.y_rotation), str(self.z_rotation)]) + return output.getvalue() + @classmethod - def decode_from_csv(cls, csv: str) -> Self: - csv_list = csv.split(",") + def decode_from_csv(cls, csv_string: str) -> Self: + reader = csv.reader([csv_string]) + csv_list = list(reader)[0] return TelemetryData(float(csv_list[0]), float(csv_list[1]), diff --git a/EosLib/packet/data_header.py b/EosLib/packet/data_header.py index a5a8805..1348a9b 100644 --- a/EosLib/packet/data_header.py +++ b/EosLib/packet/data_header.py @@ -3,8 +3,8 @@ from datetime import datetime -import EosLib.format.definitions -from EosLib.packet import definitions +from EosLib.format.definitions import Type +from EosLib.packet.definitions import Priority from EosLib.packet.definitions import HeaderPreamble, old_data_headers from EosLib.packet.exceptions import PacketFormatError, DataHeaderFormatError from EosLib.device import Device @@ -22,8 +22,8 @@ class DataHeader: def __init__(self, sender: Device, - data_type: EosLib.format.definitions.Type = EosLib.format.definitions.Type.NO_TYPE, - priority: definitions.Priority = definitions.Priority.NO_TRANSMIT, + data_type: Type = Type.NO_TYPE, + priority: Priority = Priority.NO_TRANSMIT, destination: Device = Device.NO_DEVICE, generate_time: datetime = None ): diff --git a/EosLib/packet/packet.py b/EosLib/packet/packet.py index 8ef6529..8bf32c5 100644 --- a/EosLib/packet/packet.py +++ b/EosLib/packet/packet.py @@ -1,15 +1,12 @@ -import datetime import struct import EosLib.format.decode_factory from EosLib.format.base_format import BaseFormat -from EosLib.format.definitions import Type from EosLib.packet.transmit_header import TransmitHeader from EosLib.packet.data_header import DataHeader from EosLib.packet.definitions import HeaderPreamble, Priority from EosLib.packet.exceptions import PacketFormatError -from EosLib.device import Device class Packet: @@ -115,22 +112,6 @@ def encode(self) -> bytes: return packet_bytes - def encode_to_string(self): - """ Takes a packet and encodes it into a comma separated string - - :return: The comma separated string representation of the packet - """ - self.validate_packet() - - # It's easier if we make all the encoded packet string arrays the same length, so we add a fake transmit header - if self.transmit_header is None: - self.transmit_header = TransmitHeader(0, send_rssi=0) - - return "{transmit_header}, {data_header}, {body}".format( - transmit_header=self.transmit_header.encode_to_string(), - data_header=self.data_header.encode_to_string(), - body=self.body.encode().hex()) - @staticmethod def decode(packet_bytes: bytes): """Takes a bytes object and decodes it into a Packet object. @@ -162,34 +143,3 @@ def decode(packet_bytes: bytes): decoded_transmit_header) return decoded_packet - - @staticmethod - def decode_from_string(packet_string: str): - """Takes a string and decodes it into a Packet object. - - The format is this: sequence num, send time, rssi, data type, sender, priority, generate time, body - - :param packet_string: The string to be decoded - :return: The decoded Packet object - """ - - packet_array = packet_string.split(', ') - - send_seq_num = int(packet_array[0]) - send_rssi = int(packet_array[1]) - send_time = datetime.datetime.fromisoformat(packet_array[2]) - - sender = Device(int(packet_array[3])) - data_type = Type(int(packet_array[4])) - priority = Priority(int(packet_array[5])) - destination = Device(int(packet_array[6])) - generate_time = datetime.datetime.fromisoformat(packet_array[7]) - - decoded_transmit_header = TransmitHeader(send_seq_num, send_time, send_rssi) - decoded_data_header = DataHeader(sender, data_type, priority, destination, generate_time) - decoded_packet = Packet(EosLib.format.decode_factory.decode_factory.decode(data_type, - bytes.fromhex(packet_array[8])), - decoded_data_header, - decoded_transmit_header) - - return decoded_packet diff --git a/example.py b/example.py index 8d23ab6..32cf972 100644 --- a/example.py +++ b/example.py @@ -6,6 +6,7 @@ import EosLib.packet.packet import EosLib.packet.transmit_header from EosLib.packet.packet import DataHeader, TransmitHeader, Packet +from EosLib.format.formats.empty_format import EmptyFormat sequence_number = 0 @@ -18,17 +19,17 @@ def collect_data() -> int: def log_data(data): data_header = DataHeader( EosLib.device.Device.PRESSURE, - EosLib.format.Type.DATA, + EosLib.format.Type.EMPTY, EosLib.Priority.DATA ) - body = str(data) - body = body.encode() + # Please don't store your own data as number of empty bytes, make your own format. + body = EmptyFormat(data) created_packet = Packet(body, data_header) with open("TestData.dat", 'w') as f: - f.write(created_packet.encode_to_string()) + f.write(created_packet.encode().hex()) return created_packet diff --git a/tests/format/formats/__init__.py b/tests/format/formats/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/format/formats/csv_format_test.py b/tests/format/formats/csv_format_test.py new file mode 100644 index 0000000..3865771 --- /dev/null +++ b/tests/format/formats/csv_format_test.py @@ -0,0 +1,14 @@ +from abc import ABC + +from format.formats.format_test import CheckFormat + +from EosLib.format.decode_factory import decode_factory + + +class CheckCsvFormat(CheckFormat, ABC): + def test_encode_decode_csv(self): + base_format = self.get_good_format() + base_format_csv = base_format.encode_to_csv() + new_position = decode_factory.decode_from_csv(self.get_good_format().get_format_type(), base_format_csv) + + assert base_format == new_position diff --git a/tests/format/formats/format_test.py b/tests/format/formats/format_test.py new file mode 100644 index 0000000..9696781 --- /dev/null +++ b/tests/format/formats/format_test.py @@ -0,0 +1,51 @@ +import abc +import copy +import datetime + +import EosLib.format.csv_format +from EosLib.format.decode_factory import decode_factory + + +# Name can't contain the word Test, hence Check +class CheckFormat(abc.ABC): + @abc.abstractmethod + def get_format_from_list(self, format_list: []): + raise NotImplementedError + + @abc.abstractmethod + def get_good_format_list(self): + raise NotImplementedError + + def get_good_format(self): + return self.get_format_from_list(self.get_good_format_list()) + + def test_is_eq(self): + data_1 = self.get_good_format() + data_2 = self.get_good_format() + + assert data_1 == data_2 + + def test_not_eq(self): + test_passed = True + + data_1 = self.get_good_format() + + for i in range(len(self.get_good_format_list())): + new_data_list = copy.deepcopy(self.get_good_format_list()) + if isinstance(new_data_list[i], (int, float)): + new_data_list[i] += 1 + elif isinstance(new_data_list[i], datetime.datetime): + new_data_list[i] += datetime.timedelta(1) + data_2 = self.get_format_from_list(new_data_list) + + if data_1 == data_2: + test_passed = False + + assert test_passed + + def test_encode_decode_bytes(self): + base_format = self.get_good_format() + base_position_bytes = base_format.encode() + new_format = decode_factory.decode(self.get_good_format().get_format_type(), base_position_bytes) + assert base_format == new_format + diff --git a/tests/format/formats/test_empty_format.py b/tests/format/formats/test_empty_format.py new file mode 100644 index 0000000..bad9f25 --- /dev/null +++ b/tests/format/formats/test_empty_format.py @@ -0,0 +1,11 @@ +from EosLib.format.formats.empty_format import EmptyFormat + +from tests.format.formats.format_test import CheckFormat + + +class TestEmptyFormat(CheckFormat): + def get_format_from_list(self, format_list: []): + return EmptyFormat() + + def get_good_format_list(self): + return [] \ No newline at end of file diff --git a/tests/format/formats/test_position.py b/tests/format/formats/test_position.py new file mode 100644 index 0000000..9b58a8f --- /dev/null +++ b/tests/format/formats/test_position.py @@ -0,0 +1,47 @@ +import datetime + +from EosLib.format.formats.position import Position +from EosLib.format.formats.position import FlightState + +from format.formats.csv_format_test import CheckCsvFormat + +good_data_list = [datetime.datetime.now(), + 33.7756, + 84.3963, + 974.5, + 70.2, + 5, + FlightState.DESCENT] + + +def get_position_from_list(position_list: [float]): + return Position(position_list[0], + position_list[1], + position_list[2], + position_list[3], + position_list[4], + position_list[5], + position_list[6]) + + +def get_good_position(): + return get_position_from_list(good_data_list) + + +def test_get_validity_valid(): + good_position = get_good_position() + assert good_position.get_validity() + + +def test_get_validity_bad_satellites(): + bad_satellites_position = get_good_position() + bad_satellites_position.number_of_satellites = 3 + assert not bad_satellites_position.get_validity() + + +class TestPosition(CheckCsvFormat): + def get_format_from_list(self, format_list: []): + return get_position_from_list(format_list) + + def get_good_format_list(self): + return good_data_list diff --git a/tests/format/formats/test_telemetry_data.py b/tests/format/formats/test_telemetry_data.py new file mode 100644 index 0000000..a40d7b0 --- /dev/null +++ b/tests/format/formats/test_telemetry_data.py @@ -0,0 +1,43 @@ +from EosLib.format.formats.telemetry_data import TelemetryData + +from format.formats.csv_format_test import CheckCsvFormat + +good_data_list = [32.0, 1013.25, 50.5, 30.0, 45.0, 60.0] + + +def get_telemetry_data_from_list(data_list: [float]): + return TelemetryData(data_list[0], + data_list[1], + data_list[2], + data_list[3], + data_list[4], + data_list[5]) + + +def get_good_telemetry_data(): + return get_telemetry_data_from_list(good_data_list) + + +def test_get_validity_valid(): + good_data = get_good_telemetry_data() + assert good_data.get_validity() + + +def test_get_validity_bad_humidity(): + bad_humidity_data = get_good_telemetry_data() + bad_humidity_data.humidity = -1 + assert not bad_humidity_data.get_validity() + + +def test_get_validity_bad_pressure(): + bad_humidity_data = get_good_telemetry_data() + bad_humidity_data.pressure = -1 + assert not bad_humidity_data.get_validity() + + +class TestTelemetryData(CheckCsvFormat): + def get_format_from_list(self, format_list: []): + return get_telemetry_data_from_list(format_list) + + def get_good_format_list(self): + return good_data_list diff --git a/tests/format/test_position.py b/tests/format/test_position.py deleted file mode 100644 index 0e59af4..0000000 --- a/tests/format/test_position.py +++ /dev/null @@ -1,70 +0,0 @@ -import datetime - -from EosLib.format.formats.position import Position -from EosLib.format.formats.position import FlightState - -good_data_list = [datetime.datetime.now(), - 33.7756, - 84.3963, - 974.5, - 70.2, - 5, - FlightState.DESCENT] - - -def get_position_from_list(position_list: [float]): - return Position(position_list[0], - position_list[1], - position_list[2], - position_list[3], - position_list[4], - position_list[5], - position_list[6]) - - -def get_good_position(): - return get_position_from_list(good_data_list) - - -def test_encode_decode_bytes(): - base_position = get_good_position() - base_position_bytes = base_position.encode() - new_position = Position.decode(base_position_bytes) - assert base_position == new_position - - -def test_encode_decode_csv(): - base_position = get_good_position() - base_position_csv = base_position.encode_to_csv() - new_position = Position.decode_from_csv(base_position_csv) - - assert base_position == new_position - - -class TestEQ: - def test_is_eq(self): - data_1 = get_good_position() - data_2 = get_good_position() - - assert data_1 == data_2 - - def test_not_eq(self): - test_passed = True - - data_1 = get_good_position() - - for i in range(len(good_data_list)): - new_data_list = good_data_list - if isinstance(good_data_list[i], float): - new_data_list[i] += 1 - elif isinstance(good_data_list[i], datetime.datetime): - new_data_list[i] += datetime.timedelta(1) - elif isinstance(good_data_list[i], FlightState): - new_data_list[i] = FlightState.ASCENT - data_2 = get_position_from_list(new_data_list) - - if data_1 == data_2: - test_passed = False - - assert test_passed - diff --git a/tests/format/test_telemetry_data.py b/tests/format/test_telemetry_data.py deleted file mode 100644 index 43620f4..0000000 --- a/tests/format/test_telemetry_data.py +++ /dev/null @@ -1,57 +0,0 @@ -from EosLib.format.formats.telemetry_data import TelemetryData - - -good_data_list = [32.0, 1013.25, 50.5, 30.0, 45.0, 60.0] - - -def get_telemetry_data_from_list(data_list: [float]): - return TelemetryData(data_list[0], - data_list[1], - data_list[2], - data_list[3], - data_list[4], - data_list[5]) - - -def get_good_telemetry_data(): - return get_telemetry_data_from_list(good_data_list) - - -def test_encode_decode_bytes(): - base_data = get_good_telemetry_data() - encoded_new_data = base_data.encode() - new_data = TelemetryData.decode(encoded_new_data) - - assert base_data == new_data - - -def test_encode_decode_csv(): - base_data = get_good_telemetry_data() - encoded_new_data = base_data.encode() - new_data = TelemetryData.decode(encoded_new_data) - - assert base_data == new_data - - -class TestEQ: - def test_is_eq(self): - data_1 = get_good_telemetry_data() - data_2 = get_good_telemetry_data() - - assert data_1 == data_2 - - def test_not_eq(self): - test_passed = True - - data_1 = get_good_telemetry_data() - - for i in range(len(good_data_list)): - new_data_list = good_data_list - new_data_list[i] += 1 - data_2 = get_telemetry_data_from_list(new_data_list) - - if data_1 == data_2: - test_passed = False - - assert test_passed - diff --git a/tests/packet/test_packet.py b/tests/packet/test_packet.py index 37ce999..befa104 100644 --- a/tests/packet/test_packet.py +++ b/tests/packet/test_packet.py @@ -1,62 +1,32 @@ -import struct - -from typing_extensions import Self - import pytest import EosLib.format.base_format import EosLib.format.definitions import EosLib.packet.definitions as definitions +from EosLib.format.formats.empty_format import EmptyFormat + from datetime import datetime -from EosLib.format import Type from EosLib.packet.packet import TransmitHeader, DataHeader, Packet, PacketFormatError from EosLib.packet.exceptions import DataHeaderFormatError, TransmitHeaderFormatError from EosLib.device import Device -from EosLib.format.decode_factory import decode_factory as decode_factory - - -class MockFormat(EosLib.format.base_format.BaseFormat): - - def __init__(self, num_bytes): - self.num_bytes = num_bytes - - def __eq__(self, other): - return self.num_bytes == other.num_bytes - - @staticmethod - def get_format_type() -> Type: - return EosLib.format.Type.ERROR - - def encode(self) -> bytes: - return struct.pack(self.get_format_string()) - - @classmethod - def decode(cls, data: bytes) -> Self: - return MockFormat(len(data)) - - def get_format_string(self) -> str: - return self.num_bytes * "x" - - -decode_factory.register_decoder(MockFormat) def get_valid_packet(): transmit_header = TransmitHeader(0, datetime.now(), 0) data_header = DataHeader(Device.GPS, - EosLib.format.definitions.Type.ERROR, + EosLib.format.definitions.Type.EMPTY, definitions.Priority.TELEMETRY, Device.GPS, datetime.now()) - return Packet(MockFormat(0), data_header, transmit_header) + return Packet(EmptyFormat(0), data_header, transmit_header) def test_minimal_constructor(): data_header = DataHeader(Device.GPS) - packet = Packet(MockFormat(0), data_header) + packet = Packet(EmptyFormat(0), data_header) packet.encode() @@ -171,7 +141,7 @@ def test_validate_empty_body_packet(packet): def test_body_too_large(packet): - packet.body = MockFormat(Packet.radio_body_max_bytes + 1) + packet.body = EmptyFormat(Packet.radio_body_max_bytes + 1) with pytest.raises(PacketFormatError): packet.encode() @@ -185,14 +155,14 @@ def test_illegal_body_type(packet, illegal_body): def test_allow_large_body_no_transmit(packet): - packet.body = MockFormat(Packet.radio_body_max_bytes + 1) + packet.body = EmptyFormat(Packet.radio_body_max_bytes + 1) packet.data_header.priority = definitions.Priority.NO_TRANSMIT assert packet.encode() def test_max_body_size(packet): - packet.body = MockFormat(Packet.radio_body_max_bytes) + packet.body = EmptyFormat(Packet.radio_body_max_bytes) assert packet.encode() @@ -231,22 +201,6 @@ def test_encode_decode_data_only_packet(packet): assert model_packet == decoded_packet -def test_encode_and_decode_string(packet): - test_string = packet.encode_to_string() - decoded_packet = Packet.decode_from_string(test_string) - - assert decoded_packet == packet - - -def test_encode_string_no_tx_header(packet): - packet.transmit_header = None - - test_string = packet.encode_to_string() - decoded_packet = Packet.decode_from_string(test_string) - - assert decoded_packet == packet - - def test_old_data_header_version(packet): packet.transmit_header = None @@ -268,7 +222,7 @@ def test_packet_print_two_headers(packet): "\tRSSI: 0\n" \ "Data Header:\n" \ "\tSender: GPS\n" \ - "\tData type: ERROR\n" \ + "\tData type: EMPTY\n" \ "\tPriority: TELEMETRY\n" \ "\tDestination: GPS\n" \ "\tGenerate Time: 2001-01-07 01:23:45\n" \ @@ -286,7 +240,7 @@ def test_packet_data_header_only(packet): expected_string = "No transmit header\n" \ "Data Header:\n" \ "\tSender: GPS\n" \ - "\tData type: ERROR\n" \ + "\tData type: EMPTY\n" \ "\tPriority: TELEMETRY\n" \ "\tDestination: GPS\n" \ "\tGenerate Time: 2001-01-07 01:23:45\n" \ @@ -321,7 +275,7 @@ def test_packet_print_no_body(packet): "\tRSSI: 0\n" \ "Data Header:\n" \ "\tSender: GPS\n" \ - "\tData type: ERROR\n" \ + "\tData type: EMPTY\n" \ "\tPriority: TELEMETRY\n" \ "\tDestination: GPS\n" \ "\tGenerate Time: 2001-01-07 01:23:45\n" \