diff --git a/custom_router/router.py b/custom_router/router.py index ffbf0e7..da33fc8 100644 --- a/custom_router/router.py +++ b/custom_router/router.py @@ -3,6 +3,7 @@ import json from dataclasses import dataclass from datetime import datetime +from collections import deque @dataclass class Packet: @@ -12,12 +13,14 @@ class Packet: timestamp: datetime = datetime.now() class Router: - def __init__(self, router_id: str): + def __init__(self, router_id: str, max_capacity: int = 100): self.router_id = router_id self.routing_table: Dict[str, Dict[str, int]] = {} # {destination: {next_hop: cost}} self.neighbors: Dict[str, int] = {} # {neighbor_id: cost} - self.packet_queue: List[Packet] = [] + self.packet_queue: deque[Packet] = deque(maxlen=max_capacity) # FIFO queue with max capacity self.network_graph = nx.Graph() + self.max_capacity = max_capacity + self.packet_count: Dict[str, List[Packet]] = {} # {destination: [packets]} def add_neighbor(self, neighbor_id: str, cost: int): """Add a neighbor router with associated cost.""" @@ -32,23 +35,53 @@ def remove_neighbor(self, neighbor_id: str): self.network_graph.remove_edge(self.router_id, neighbor_id) self.update_routing_table() - def receive_packet(self, packet: Packet): + def receive_packet(self, packet: Packet) -> bool: """Receive a packet and either forward it or process it.""" + if len(self.packet_queue) >= self.max_capacity: + return False # Queue is full + if packet.destination == self.router_id: self.packet_queue.append(packet) + self._update_packet_count(packet) return True else: return self.forward_packet(packet) def forward_packet(self, packet: Packet) -> bool: - """Forward a packet to the next hop based on routing table.""" + """Forward a packet to the next hop based on routing table using FIFO.""" if packet.destination in self.routing_table: next_hop = min(self.routing_table[packet.destination].items(), key=lambda x: x[1])[0] # In a real implementation, this would send the packet to next_hop + self._update_packet_count(packet) return True return False + def _update_packet_count(self, packet: Packet): + """Update packet count for the destination.""" + if packet.destination not in self.packet_count: + self.packet_count[packet.destination] = [] + self.packet_count[packet.destination].append(packet) + + def get_packet_count(self, destination: str, start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None) -> int: + """Get count of packets for a destination within a timeframe.""" + # can this be done more optimally? + if destination not in self.packet_count: + return 0 + + packets = self.packet_count[destination] + if start_time is None and end_time is None: + return len(packets) + + filtered_packets = packets + if start_time: + filtered_packets = [p for p in filtered_packets if p.timestamp >= start_time] + if end_time: + filtered_packets = [p for p in filtered_packets if p.timestamp <= end_time] + + return len(filtered_packets) + def update_routing_table(self): """Update routing table using distance vector algorithm with split horizon.""" # Initialize routing table with direct neighbors @@ -68,4 +101,12 @@ def get_routing_table(self) -> Dict[str, Dict[str, int]]: def get_packet_queue(self) -> List[Packet]: """Return the current packet queue.""" - return self.packet_queue \ No newline at end of file + return list(self.packet_queue) + + def get_queue_size(self) -> int: + """Return the current size of the packet queue.""" + return len(self.packet_queue) + + def is_queue_full(self) -> bool: + """Check if the packet queue is full.""" + return len(self.packet_queue) >= self.max_capacity \ No newline at end of file