Skip to content

Commit

Permalink
Add abstract format classes and update Position
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasmholder committed May 24, 2023
1 parent 75b33e8 commit a660524
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 115 deletions.
23 changes: 23 additions & 0 deletions EosLib/format/base_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import ABC
from abc import abstractmethod
from typing_extensions import Self

import EosLib


class BaseFormat(ABC):
@abstractmethod
def __init__(self):
raise NotImplementedError

@abstractmethod
def encode(self) -> bytes:
raise NotImplementedError

def encode_for_transmit(self) -> bytes:
return self.encode()[0:EosLib.packet.Packet.radio_body_max_bytes]

@classmethod
@abstractmethod
def decode(cls, data: bytes | EosLib.packet.Packet) -> Self:
raise NotImplementedError
20 changes: 20 additions & 0 deletions EosLib/format/csv_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from abc import ABC
from abc import abstractmethod
from typing_extensions import Self

from EosLib.format.base_format import BaseFormat


class CsvFormat(BaseFormat, ABC):
@abstractmethod
def get_csv_headers(self):
raise NotImplementedError

@abstractmethod
def encode_to_csv(self) -> str:
raise NotImplementedError

@classmethod
@abstractmethod
def decode_from_csv(cls, csv: str) -> Self:
raise NotImplementedError
129 changes: 80 additions & 49 deletions EosLib/format/position.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import datetime
import struct
from datetime import datetime
from enum import IntEnum
from typing_extensions import Self

from EosLib import Type
from EosLib.packet.packet import Packet
import EosLib
from EosLib.format.csv_format import CsvFormat
from EosLib.packet import Packet


class FlightState(IntEnum):
Expand All @@ -14,8 +16,8 @@ class FlightState(IntEnum):
DESCENT = 4


class Position:
# Struct format is: timestamp, lat, long, speed, altitude, number of satellites, flight state
class Position(CsvFormat):
# Struct format is: GPS timestamp, lat, long, speed, altitude, number of satellites, flight state
gps_struct_string = "!" \
"d" \
"d" \
Expand All @@ -25,51 +27,80 @@ class Position:
"H" \
"B"

def __init__(self):
self.local_time = None
self.timestamp = None
self.latitude = None
self.longitude = None
self.altitude = None
self.speed = None
self.number_of_satellites = None
self.valid = False
self.flight_state = FlightState.NOT_SET

# TODO: figure out a more legitimate way to check validity
def set_validity(self):
def __init__(self,
gps_time: datetime.datetime.now(),
latitude: float,
longitude: float,
speed: float,
altitude: float,
number_of_satellites: int,
flight_state: FlightState = FlightState.NOT_SET):
self.gps_time = gps_time
self.latitude = latitude
self.longitude = longitude
self.altitude = altitude
self.speed = speed
self.number_of_satellites = number_of_satellites
self.flight_state = flight_state

self.valid = self.get_validity()

def encode(self) -> bytes:
return struct.pack(self.gps_struct_string,
self.gps_time.timestamp(),
self.latitude,
self.longitude,
self.speed,
self.altitude,
self.number_of_satellites,
self.flight_state)

@classmethod
def decode(cls, data: bytes | Packet) -> Self:
if isinstance(data, Packet):
if data.data_header.data_type != EosLib.Type.POSITION:
raise ValueError("Attempted to decode a non-position packet using Position")
else:
data = data.body

unpacked_data = struct.unpack(cls.gps_struct_string, data)

return Position(datetime.datetime.fromtimestamp(unpacked_data[0]),
unpacked_data[1],
unpacked_data[2],
unpacked_data[3],
unpacked_data[4],
unpacked_data[5],
FlightState(unpacked_data[6]))

def get_csv_headers(self):
return ["GPS Timestamp", "Latitude", "Longitude", "Speed", "Altitude", "Number of Satellites", "Flight State"]

def encode_to_csv(self) -> str:
return ",".join([self.gps_time.isoformat(),
str(self.latitude),
str(self.longitude),
str(self.speed),
str(self.altitude),
str(self.number_of_satellites),
str(self.flight_state.value)])

@classmethod
def decode_from_csv(cls, csv: str) -> Self:
csv_list = csv.split(",")

return Position(datetime.datetime.fromisoformat(csv_list[0]),
float(csv_list[1]),
float(csv_list[2]),
float(csv_list[3]),
float(csv_list[4]),
int(csv_list[5]),
FlightState(int(csv_list[6])))

def get_validity(self):
if (self.number_of_satellites < 4 or
self.latitude == 0 or
self.longitude == 0):
self.valid = False
else:
self.valid = True

@staticmethod
def decode_position(gps_packet: Packet | bytes):
new_position = Position()

if isinstance(gps_packet, Packet):
if gps_packet.data_header.data_type != Type.POSITION:
raise ValueError("Packet is not a position")
packet_body = gps_packet.body
return False
else:
# this is the case where the bytes of the packet body are inputted and not the packet itself
packet_body = gps_packet
unpacked_tuple = struct.unpack(Position.gps_struct_string, packet_body)
new_position.timestamp = datetime.fromtimestamp(unpacked_tuple[0])
new_position.latitude = unpacked_tuple[1]
new_position.longitude = unpacked_tuple[2]
new_position.altitude = unpacked_tuple[3]
new_position.speed = unpacked_tuple[4]
new_position.number_of_satellites = unpacked_tuple[5]
new_position.flight_state = unpacked_tuple[6]
new_position.set_validity()

return new_position

@staticmethod
def encode_position(timestamp: float, latitude: float, longitude: float, altitude: float, speed: float,
number_of_satellites: int, flight_state: FlightState) -> bytes:
return struct.pack(Position.gps_struct_string, timestamp, latitude, longitude, altitude, speed,
number_of_satellites, flight_state)
return True
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pluggy==1.0.0
pyparsing==3.0.9
pytest==7.2.2
tomli==2.0.1
typing-extensions==4.6.0
110 changes: 44 additions & 66 deletions tests/format/test_position.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,44 @@
from datetime import datetime
from EosLib.packet.packet import Packet
import EosLib.packet.transmit_header
import EosLib.packet.data_header
from EosLib.format.position import Position, FlightState

import EosLib
from EosLib.packet import data_header, transmit_header
from EosLib.device import Device
from EosLib.packet import Packet
from EosLib.packet.definitions import Type

def test_position_bytes():
"""Tests encode and decode functions
:returns: If encode and decode work
:rtype: boolean
"""
current_time = datetime.now()
new_data = Position()

# new_data.set_validity()
time = current_time
encoded_new_data = new_data.encode_position(time.timestamp(), 23.0, 24.0, 25.0, 26.0, 5,
FlightState.NOT_SET)

decoded_new_data = Position.decode_position(encoded_new_data)

assert time == decoded_new_data.timestamp
assert 23.0 == decoded_new_data.latitude
assert 24.0 == decoded_new_data.longitude
assert 25.0 == decoded_new_data.altitude
assert 26.0 == decoded_new_data.speed
assert 5 == decoded_new_data.number_of_satellites
assert FlightState.NOT_SET == decoded_new_data.flight_state
assert decoded_new_data.valid


def test_position_packet():
"""Tests encode and decode functions for packet input
:returns: If encode and decode work
:rtype: boolean
"""

position_data_header = EosLib.packet.data_header.DataHeader(Device.O3, Type.POSITION)
position_transmit_header = EosLib.packet.transmit_header.TransmitHeader(2)

current_time = datetime.now()
new_data = Position()

encoded_new_data = new_data.encode_position(current_time.timestamp(), 24.0, 25.0, 26.0, 27.0, 5,
FlightState.NOT_SET)
packet = Packet(encoded_new_data, position_data_header, position_transmit_header)

decoded_new_data = Position.decode_position(packet)

assert current_time == decoded_new_data.timestamp
assert 24.0 == decoded_new_data.latitude
assert 25.0 == decoded_new_data.longitude
assert 26.0 == decoded_new_data.altitude
assert 27.0 == decoded_new_data.speed
assert 5 == decoded_new_data.number_of_satellites
assert FlightState.NOT_SET == decoded_new_data.flight_state
assert decoded_new_data.valid

import datetime

from EosLib.format.position import Position
from EosLib.format.position import FlightState


def get_good_position():
return Position(datetime.datetime.now(),
33.7756,
84.3963,
974.5,
70.2,
5,
FlightState.DESCENT)


def test_encode_decode_bytes():
base_position = get_good_position()
base_position_bytes = base_position.encode()
new_position = Position.decode(base_position_bytes)
valid_decode = base_position.gps_time == new_position.gps_time and \
base_position.latitude == new_position.latitude and \
base_position.longitude == new_position.longitude and \
base_position.altitude == new_position.altitude and \
base_position.speed == new_position.speed and \
base_position.number_of_satellites == new_position.number_of_satellites and\
base_position.flight_state == new_position.flight_state
print("loading")
assert valid_decode


def test_encode_decode_csv():
base_position = get_good_position()
base_position_csv = base_position.encode_to_csv()
new_position = Position.decode_from_csv(base_position_csv)
valid_decode = base_position.gps_time == new_position.gps_time and \
base_position.latitude == new_position.latitude and \
base_position.longitude == new_position.longitude and \
base_position.altitude == new_position.altitude and \
base_position.speed == new_position.speed and \
base_position.number_of_satellites == new_position.number_of_satellites and\
base_position.flight_state == new_position.flight_state
print("loading")
assert valid_decode

0 comments on commit a660524

Please sign in to comment.