diff --git a/exo/networking/grpc/test_network_interfaces.py b/exo/networking/grpc/test_network_interfaces.py new file mode 100644 index 000000000..71a1ec9ec --- /dev/null +++ b/exo/networking/grpc/test_network_interfaces.py @@ -0,0 +1,91 @@ +import subprocess +import json +import psutil +import time + +def get_interface_priority(interface_name, interface_type, hardware): + if 'thunderbolt' in hardware or 'thunderbolt' in interface_type: + return 3 + elif 'wi-fi' in interface_type or 'airport' in hardware: + return 2 + elif interface_name.startswith('en'): + return 1 + else: + return 0 + +def get_network_interfaces(): + if psutil.MACOS: + network_info = subprocess.check_output(['system_profiler', 'SPNetworkDataType', '-json']).decode('utf-8') + network_data = json.loads(network_info) + + interfaces = [] + + for interface in network_data.get('SPNetworkDataType', []): + interface_name = interface.get('interface') + interface_type = interface.get('type', '').lower() + hardware = interface.get('hardware', '').lower() + + print(f"Debug - Interface: {interface_name}, Type: {interface_type}, Hardware: {hardware}") + + if not interface_name: + continue + + priority = get_interface_priority(interface_name, interface_type, hardware) + interfaces.append((interface_name, priority)) + + # Sort interfaces by priority + interfaces.sort(key=lambda x: x[1], reverse=True) + + return interfaces + else: + print("This test is designed for macOS only.") + return [] + +def simulate_broadcast_receive(peer_id, interface, priority, known_peers): + print(f"\nReceived broadcast from peer {peer_id} on interface {interface} with priority {priority}") + + if peer_id in known_peers: + current_interface, current_priority = known_peers[peer_id] + if priority > current_priority: + print(f"Higher priority detected. Disconnecting from {current_interface} and reconnecting on {interface}") + known_peers[peer_id] = (interface, priority) + else: + print(f"Keeping existing connection on {current_interface} with priority {current_priority}") + else: + print(f"New peer discovered. Connecting on {interface}") + known_peers[peer_id] = (interface, priority) + +def run_tests(): + print("Testing network interface detection, prioritization, and reconnection logic") + interfaces = get_network_interfaces() + + # Test 1: Check if interfaces are detected and sorted as we want, I mean priority of course! + assert len(interfaces) > 0, "No interfaces detected" + assert all(interfaces[i][1] >= interfaces[i+1][1] for i in range(len(interfaces)-1)), "Interfaces are not correctly sorted by priority" + + print("\nPrioritized list of interfaces:") + for i, (interface, priority) in enumerate(interfaces, 1): + print(f"{i}. {interface} (Priority: {priority})") + + # Test 2: Simulate broadcast receives and reconnection + known_peers = {} + + # Simulate receiving broadcasts from the same peer on different interfaces + peer_id = "test_peer_1" + for interface, priority in interfaces: + simulate_broadcast_receive(peer_id, interface, priority, known_peers) + time.sleep(0.1) # Add a small delay + + # Test 3: Check if the peer is connected to the highest priority interface + assert peer_id in known_peers, "Peer was not added to known_peers" + final_interface, final_priority = known_peers[peer_id] + assert final_priority == max(priority for _, priority in interfaces), "Peer is not connected to the highest priority interface" + + print("\nFinal known peers:") + for peer_id, (interface, priority) in known_peers.items(): + print(f"Peer {peer_id}: Connected on {interface} with priority {priority}") + + print("\nAll tests passed successfully!") + +if __name__ == "__main__": + run_tests()