From 9fa20caabc15918df99b9a0806a5091021dc7d5b Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Fri, 11 Apr 2025 17:49:06 -0700 Subject: [PATCH] validate each device individually --- pylabrobot/io/capture.py | 29 ++++++++++++++++------------- pylabrobot/io/ftdi.py | 26 +++++++++++++------------- pylabrobot/io/serial.py | 8 ++++---- pylabrobot/io/usb.py | 4 ++-- 4 files changed, 35 insertions(+), 32 deletions(-) diff --git a/pylabrobot/io/capture.py b/pylabrobot/io/capture.py index d48b3a9c76..7b802948c7 100644 --- a/pylabrobot/io/capture.py +++ b/pylabrobot/io/capture.py @@ -83,34 +83,37 @@ def capture_active(self): class CaptureReader: def __init__(self, path: str): self.path = path - self.commands: List[dict] = [] + self.commands: dict[str, List[dict]] = {} with open(path, "r") as f: data = json.load(f) for c in data["commands"]: - self.commands.append(c) - self._command_idx = 0 + if c.get("device_id") not in self.commands: + self.commands[c["device_id"]] = [] + self.commands[c["device_id"]].append(c) + self._command_idx = {k: 0 for k in self.commands.keys()} def start(self): global _capture_or_validation_active _capture_or_validation_active = True - def next_command(self) -> dict: - command = self.commands[self._command_idx] - self._command_idx += 1 + def next_command(self, device_id) -> dict: + print("getting next command for device", device_id) + command = self.commands[device_id][self._command_idx[device_id]] + self._command_idx[device_id] += 1 return command def done(self): - if self._command_idx < len(self.commands): - left = len(self.commands) - self._command_idx - next_command = self.commands[self._command_idx] - raise ValidationError( - f"Log file not fully read, {left} lines left. First command: {next_command}" - ) + # if self._command_idx < len(self.commands): + # left = len(self.commands) - self._command_idx + # next_command = self.commands[self._command_idx] + # raise ValidationError( + # f"Log file not fully read, {left} lines left. First command: {next_command}" + # ) print("Validation successful!") self.reset() def reset(self): - self._command_idx = 0 + self._command_idx = {k: 0 for k in self.commands.keys()} global _capture_or_validation_active _capture_or_validation_active = True diff --git a/pylabrobot/io/ftdi.py b/pylabrobot/io/ftdi.py index 097144920b..f71918f29e 100644 --- a/pylabrobot/io/ftdi.py +++ b/pylabrobot/io/ftdi.py @@ -142,7 +142,7 @@ async def setup(self): pass def set_baudrate(self, baudrate: int): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -152,7 +152,7 @@ def set_baudrate(self, baudrate: int): raise ValidationError(f"Next line is {next_command}, expected FTDI set_baudrate {baudrate}") def set_rts(self, level: bool): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -162,7 +162,7 @@ def set_rts(self, level: bool): raise ValidationError(f"Next line is {next_command}, expected FTDI set_rts {level}") def set_dtr(self, level: bool): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -172,7 +172,7 @@ def set_dtr(self, level: bool): raise ValidationError(f"Next line is {next_command}, expected FTDI set_dtr {level}") def usb_reset(self): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -183,7 +183,7 @@ def usb_reset(self): ) def set_latency_timer(self, latency: int): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -195,7 +195,7 @@ def set_latency_timer(self, latency: int): ) def set_line_property(self, bits: int, stopbits: int, parity: int): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -207,7 +207,7 @@ def set_line_property(self, bits: int, stopbits: int, parity: int): ) def set_flowctrl(self, flowctrl: int): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -217,7 +217,7 @@ def set_flowctrl(self, flowctrl: int): raise ValidationError(f"Next line is {next_command}, expected FTDI set_flowctrl {flowctrl}") def usb_purge_rx_buffer(self): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -228,7 +228,7 @@ def usb_purge_rx_buffer(self): ) def usb_purge_tx_buffer(self): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -239,7 +239,7 @@ def usb_purge_tx_buffer(self): ) def poll_modem_status(self) -> int: - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -251,7 +251,7 @@ def poll_modem_status(self) -> int: return int(next_command.data) def write(self, data: bytes): - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -263,7 +263,7 @@ def write(self, data: bytes): raise ValidationError("Data mismatch: difference was written to stdout.") def read(self, num_bytes: int = 1) -> bytes: - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id @@ -274,7 +274,7 @@ def read(self, num_bytes: int = 1) -> bytes: return bytes.fromhex(next_command.data) def readline(self) -> bytes: # type: ignore # very dumb it's reading from pyserial - next_command = FTDICommand(**self.cr.next_command()) + next_command = FTDICommand(**self.cr.next_command(self._device_id)) if not ( next_command.module == "ftdi" and next_command.device_id == self._device_id diff --git a/pylabrobot/io/serial.py b/pylabrobot/io/serial.py index b96e12c5c4..87759e8116 100644 --- a/pylabrobot/io/serial.py +++ b/pylabrobot/io/serial.py @@ -152,7 +152,7 @@ async def setup(self): pass def write(self, data: bytes): - next_command = SerialCommand(**self.cr.next_command()) + next_command = SerialCommand(**self.cr.next_command(self._port)) if not ( next_command.module == "serial" and next_command.device_id == self._port @@ -164,18 +164,18 @@ def write(self, data: bytes): raise ValidationError("Data mismatch: difference was written to stdout.") def read(self, num_bytes: int = 1) -> bytes: - next_command = SerialCommand(**self.cr.next_command()) + next_command = SerialCommand(**self.cr.next_command(self._port)) if not ( next_command.module == "serial" and next_command.device_id == self._port and next_command.action == "read" - and len(next_command.data) == num_bytes + and len(next_command.data) <= num_bytes # we can actually read less bytes than requested ): raise ValidationError(f"Next line is {next_command}, expected Serial read {num_bytes}") return next_command.data.encode() def readline(self) -> bytes: # type: ignore # very dumb it's reading from pyserial - next_command = SerialCommand(**self.cr.next_command()) + next_command = SerialCommand(**self.cr.next_command(self._port)) if not ( next_command.module == "serial" and next_command.device_id == self._port diff --git a/pylabrobot/io/usb.py b/pylabrobot/io/usb.py index e6b8f37bcb..643720fa1c 100644 --- a/pylabrobot/io/usb.py +++ b/pylabrobot/io/usb.py @@ -316,7 +316,7 @@ async def setup(self): pass def write(self, data: bytes, timeout: Optional[float] = None): - next_command = USBCommand(**self.cr.next_command()) + next_command = USBCommand(**self.cr.next_command(self._unique_id)) if not ( next_command.module == "usb" and next_command.device_id == self._unique_id @@ -328,7 +328,7 @@ def write(self, data: bytes, timeout: Optional[float] = None): raise ValidationError("Data mismatch: difference was written to stdout.") def read(self, timeout: Optional[float] = None) -> bytes: - next_command = USBCommand(**self.cr.next_command()) + next_command = USBCommand(**self.cr.next_command(self._unique_id)) if not ( next_command.module == "usb" and next_command.device_id == self._unique_id