Skip to content

Commit b9426c5

Browse files
committed
optimize parse data for lossy telemetry data
1 parent 12af3e3 commit b9426c5

File tree

1 file changed

+29
-23
lines changed

1 file changed

+29
-23
lines changed

Backend/core/comms.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from multiprocessing.managers import BaseManager
1414
from . import db
1515
from file_sync.file_sync_down.main import *
16+
import re
1617

1718
format_string = '<' # little-endian
1819
byte_length = 0
@@ -241,35 +242,40 @@ async def remote_db_fetch(self, server_url: str):
241242

242243
def parse_packets(self, new_data: bytes, tmp_source: str):
243244
"""
244-
Parse and check the length of each packet
245-
:param new_data: Newly received bytes from the comm channel
246-
:param tmp_source: Name of tmp data source, put comm channel name here e.g. tcp, lte
245+
Parse and check the length of each packet.
246+
247+
:param new_data: Newly received bytes from the comm channel.
248+
:param tmp_source: Name of tmp data source, put comm channel name here e.g. tcp, lte.
247249
"""
248-
header = b'<bsr>'
249-
footer = b'</bsr>'
250+
header = b"<bsr>"
251+
footer = b"<bsr"
252+
if tmp_source not in self.__tmp_data:
253+
self.__tmp_data[tmp_source] = b''
254+
255+
# Append new data to the temporary buffer
250256
self.__tmp_data[tmp_source] += new_data
257+
258+
# Regex pattern to match packets with <bsr> and </bsr> tags
259+
pattern = re.compile(b'<bsr>(.*?)</bsr>', re.DOTALL)
260+
251261
packets = []
252262
while True:
253-
# Search for the next complete data packet
254-
try:
255-
start_index = self.__tmp_data[tmp_source].index(header)
256-
end_index = self.__tmp_data[tmp_source].index(footer)
257-
except ValueError:
263+
match = pattern.search(self.__tmp_data[tmp_source])
264+
if not match:
258265
break
266+
# Extract the packet data
267+
packet = match.group(1)
268+
#remove headers and footers
269+
packets.append(packet)
259270

260-
# Extract a complete data packet
261-
packets.append(self.__tmp_data[tmp_source][start_index + len(header):end_index])
262-
# Update the remaining data to exclude the processed packet
263-
self.__tmp_data[tmp_source] = self.__tmp_data[tmp_source][end_index + len(footer):]
264-
265-
# If the remaining data is longer than the expected packet length,
266-
# there might be an incomplete packet, so log a warning.
267-
if len(self.__tmp_data[tmp_source]) >= byte_length:
268-
print(f"Source: {tmp_source}: Warning: Incomplete or malformed packet ------------------------------------")
269-
self.__tmp_data[tmp_source] = b''
271+
if match.start(0) != 0:
272+
print(f"skipping {match.start(0)} bytes")
273+
# Remove the processed packet from the temporary buffer
274+
self.__tmp_data[tmp_source] = self.__tmp_data[tmp_source][match.end():]
270275

271276
return packets
272277

278+
273279
def fs_down_callback(self, data):
274280
# copied from listen_upd()
275281
if not data:
@@ -301,11 +307,11 @@ def sigint_handler(signal, frame):
301307

302308
def start_comms():
303309
# start file sync
304-
p.start()
310+
# p.start()
305311

306312
# Start two live comm channels
307-
vps_thread = threading.Thread(target=lambda : asyncio.run(telemetry.remote_db_fetch(config.VPS_URL)))
308-
vps_thread.start()
313+
#vps_thread = threading.Thread(target=lambda : asyncio.run(telemetry.remote_db_fetch(config.VPS_URL)))
314+
#vps_thread.start()
309315
#socket_thread = threading.Thread(target=lambda: telemetry.listen_udp(config.UDP_PORT))
310316
#socket_thread.start()
311317
socket_thread = threading.Thread(target=lambda: telemetry.serial_read())

0 commit comments

Comments
 (0)